diff options
author | spuchin <spuchin@ydb.tech> | 2022-11-01 19:24:05 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-11-01 19:24:05 +0300 |
commit | 89a1197513c1d767967e489061db3fc25c56f890 (patch) | |
tree | 65f815bfe1e75fb5a4f0dfbcb9b37ee5a9f7809d | |
parent | 31f51049ff883f69143f6618b930f26569d8c727 (diff) | |
download | ydb-89a1197513c1d767967e489061db3fc25c56f890.tar.gz |
Remove engine selection code from kqp_worker. ()
-rw-r--r-- | ydb/core/kqp/common/kqp_transform.h | 19 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_tx_info.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_runner.cpp | 131 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 609 | ||||
-rw-r--r-- | ydb/core/kqp/ut/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/fat/CMakeLists.darwin.txt | 52 | ||||
-rw-r--r-- | ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt | 54 | ||||
-rw-r--r-- | ydb/core/kqp/ut/fat/CMakeLists.linux.txt | 56 | ||||
-rw-r--r-- | ydb/core/kqp/ut/fat/CMakeLists.txt | 15 | ||||
-rw-r--r-- | ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp | 1042 |
14 files changed, 41 insertions, 1963 deletions
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h index 309b67951a8..856fd8ef760 100644 --- a/ydb/core/kqp/common/kqp_transform.h +++ b/ydb/core/kqp/common/kqp_transform.h @@ -221,27 +221,10 @@ public: DeferredEffects.Clear(); ParamsState = MakeIntrusive<TParamsState>(); SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot; - ForceNewEngineSettings = {}; } TKqpTransactionInfo GetInfo() const; - void ForceOldEngine() { - auto engine = DeferredEffects.GetEngine(); - YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::OldEngine); - YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine || *ForceNewEngineSettings.ForcedNewEngine == false); - ForceNewEngineSettings.ForcedNewEngine = false; - } - - void ForceNewEngine(ui32 percent, ui32 level) { - auto engine = DeferredEffects.GetEngine(); - YQL_ENSURE(!engine || engine == TKqpTransactionInfo::EEngine::NewEngine); - YQL_ENSURE(!ForceNewEngineSettings.ForcedNewEngine.has_value()); - ForceNewEngineSettings.ForcedNewEngine = true; - ForceNewEngineSettings.ForceNewEnginePercent = percent; - ForceNewEngineSettings.ForceNewEngineLevel = level; - } - void SetIsolationLevel(const Ydb::Table::TransactionSettings& settings) { switch (settings.tx_mode_case()) { case Ydb::Table::TransactionSettings::kSerializableReadWrite: @@ -296,8 +279,6 @@ public: TIntrusivePtr<TParamsState> ParamsState; IKqpGateway::TKqpSnapshotHandle SnapshotHandle; - - TKqpForceNewEngineState ForceNewEngineSettings; }; class TLogExprTransformer { diff --git a/ydb/core/kqp/common/kqp_tx_info.h b/ydb/core/kqp/common/kqp_tx_info.h index a3e57414ae6..0c26981bd3f 100644 --- a/ydb/core/kqp/common/kqp_tx_info.h +++ b/ydb/core/kqp/common/kqp_tx_info.h @@ -7,12 +7,6 @@ namespace NKikimr { namespace NKqp { -struct TKqpForceNewEngineState { - ui32 ForceNewEnginePercent = 0; - ui32 ForceNewEngineLevel = 0; - std::optional<bool> ForcedNewEngine; -}; - struct TKqpTransactionInfo { enum class EKind { Pure, @@ -39,7 +33,6 @@ public: TDuration TotalDuration; TDuration ServerDuration; ui32 QueriesCount = 0; - TKqpForceNewEngineState ForceNewEngineState; }; } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index ec37ab7f42c..349ee4d49f0 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1183,15 +1183,15 @@ public: } void ForceTxOldEngine(const TString& txId) override { - if (auto tx = FindTransaction(txId)) { - (*tx)->ForceOldEngine(); - } - } + Y_UNUSED(txId); + Y_ENSURE(false, "Unsupported ForceTxOldEngine"); + } void ForceTxNewEngine(const TString& txId, ui32 percent, ui32 level) override { - if (auto tx = FindTransaction(txId)) { - (*tx)->ForceNewEngine(percent, level); - } + Y_UNUSED(txId); + Y_UNUSED(percent); + Y_UNUSED(level); + Y_ENSURE(false, "Unsupported ForceTxNewEngine"); } TMaybe<TKqpTransactionInfo> GetTransactionInfo(const TString& txId) override { @@ -2292,7 +2292,6 @@ TKqpTransactionInfo TKqpTransactionContext::GetInfo() const { txInfo.QueriesCount = QueriesCount; txInfo.TxEngine = DeferredEffects.GetEngine(); - txInfo.ForceNewEngineState = ForceNewEngineSettings; return txInfo; } diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index fb41511d93f..c3a2196a66b 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -25,19 +25,6 @@ using namespace NThreading; namespace { -void FillAstAndPlan(NKikimrKqp::TPreparedKql& kql, const TExprNode::TPtr& queryExpr, TExprContext& ctx) { - TStringStream astStream; - auto ast = ConvertToAst(*queryExpr, ctx, TExprAnnotationFlags::None, true); - ast.Root->PrettyPrintTo(astStream, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine); - kql.SetAst(astStream.Str()); - - NJsonWriter::TBuf writer; - writer.SetIndentSpaces(2); - - WriteKqlPlan(writer, queryExpr); - kql.SetPlan(writer.Str()); -} - class TAsyncRunResult : public TKqpAsyncResultBase<IKikimrQueryExecutor::TQueryResult, false> { public: using TResult = IKikimrQueryExecutor::TQueryResult; @@ -325,12 +312,6 @@ public: YQL_ENSURE(cluster == Cluster); YQL_ENSURE(phyQuery->GetType() == NKqpProto::TKqpPhyQuery::TYPE_DATA); - if (!Config->HasAllowKqpNewEngine()) { - ctx.AddError(TIssue(TPosition(), "NewEngine execution is not allowed on this cluster.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>( - ctx.IssueManager.GetIssues())); - } - return ExecutePhysicalDataQuery(world, std::move(phyQuery), ctx, settings); } @@ -391,111 +372,7 @@ private: } bool sysColumnsEnabled = TransformCtx->Config->SystemColumnsEnabled(); - - std::optional<TKqpTransactionInfo::EEngine> engine; - if (settings.UseNewEngine.Defined()) { - engine = *settings.UseNewEngine - ? TKqpTransactionInfo::EEngine::NewEngine - : TKqpTransactionInfo::EEngine::OldEngine; - } - if (!engine.has_value() && Config->UseNewEngine.Get().Defined()) { - engine = Config->UseNewEngine.Get().Get() - ? TKqpTransactionInfo::EEngine::NewEngine - : TKqpTransactionInfo::EEngine::OldEngine; - } - if (!engine.has_value() && Config->HasKqpForceNewEngine()) { - engine = TKqpTransactionInfo::EEngine::NewEngine; - } - - if ((queryCtx->Type == EKikimrQueryType::Scan) || - (engine.has_value() && *engine == TKqpTransactionInfo::EEngine::NewEngine)) - { - return PrepareQueryNewEngine(cluster, dataQuery, ctx, settings, sysColumnsEnabled); - } - - YQL_ENSURE(false, "Unexpected query prepare in OldEngine mode."); - - // OldEngine only - YQL_ENSURE(!engine.has_value() || *engine == TKqpTransactionInfo::EEngine::OldEngine); - - for (const auto& [name, table] : TransformCtx->Tables->GetTables()) { - if (!table.Metadata->SysView.empty()) { - ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), TStringBuilder() - << "Table " << table.Metadata->Name << " is a system view. " - << "System views are not supported by data queries." - )); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - } - - auto program = BuildKiProgram(dataQuery, *TransformCtx->Tables, ctx, sysColumnsEnabled); - - KqlOptimizeTransformer->Rewind(); - - TExprNode::TPtr optimizedProgram = program.Ptr(); - auto status = InstantTransform(*KqlOptimizeTransformer, optimizedProgram, ctx); - if (status != IGraphTransformer::TStatus::Ok || !TMaybeNode<TKiProgram>(optimizedProgram)) { - ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), "Failed to optimize KQL query.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - YQL_ENSURE(optimizedProgram->GetTypeAnn()); - - queryCtx->QueryTraits = CollectQueryTraits(TKiProgram(optimizedProgram), ctx); - - KqlTypeAnnTransformer->Rewind(); - - TExprNode::TPtr finalProgram; - bool hasNonDeterministicFunctions; - TPeepholeSettings peepholeSettings; - peepholeSettings.WithNonDeterministicRules = false; - status = PeepHoleOptimizeNode<false>(optimizedProgram, finalProgram, ctx, TypesCtx, KqlTypeAnnTransformer.Get(), - hasNonDeterministicFunctions, peepholeSettings); - if (status != IGraphTransformer::TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), "Failed to peephole optimize KQL query.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - status = ReplaceNonDetFunctionsWithParams(finalProgram, ctx); - if (status != IGraphTransformer::TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), - "Failed to replace non deterministic functions with params for KQL query.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - KqlPrepareTransformer->Rewind(); - - NKikimrKqp::TKqlSettings kqlSettings; - kqlSettings.SetCommitTx(settings.CommitTx); - kqlSettings.SetRollbackTx(settings.RollbackTx); - - TransformCtx->Reset(); - TransformCtx->Settings = kqlSettings; - - if (!TxState->Tx().EffectiveIsolationLevel) { - TxState->Tx().EffectiveIsolationLevel = kqlSettings.GetIsolationLevel(); - } - - TransformCtx->QueryCtx->PreparingQuery->SetVersion(NKikimrKqp::TPreparedQuery::VERSION_V1); - auto kql = TransformCtx->QueryCtx->PreparingQuery->AddKqls(); - kql->MutableSettings()->CopyFrom(TransformCtx->Settings); - - FillAstAndPlan(*kql, finalProgram, ctx); - - auto operations = TableOperationsToProto(dataQuery.Operations(), ctx); - for (auto& op : operations) { - const auto tableName = op.GetTable(); - auto operation = static_cast<TYdbOperation>(op.GetOperation()); - - *kql->AddOperations() = std::move(op); - - const auto& desc = TransformCtx->Tables->GetTable(cluster, tableName); - TableDescriptionToTableInfo(desc, operation, *kql->MutableTableInfo()); - } - - TransformCtx->PreparingKql = kql; - - return MakeIntrusive<TAsyncRunResult>(finalProgram, ctx, *KqlPrepareTransformer, *TransformCtx); + return PrepareQueryNewEngine(cluster, dataQuery, ctx, settings, sysColumnsEnabled); } TIntrusivePtr<TAsyncQueryResult> PrepareQueryNewEngine(const TString& cluster, const TKiDataQuery& dataQuery, @@ -515,12 +392,6 @@ private: YQL_ENSURE(false, "PrepareQueryNewEngine, unexpected query type: " << queryType); } - if (!Config->HasAllowKqpNewEngine() && queryType == EKikimrQueryType::Dml) { - ctx.AddError(TIssue(ctx.GetPosition(dataQuery.Pos()), - "NewEngine execution is not allowed on this cluster.")); - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - auto kqlQuery = BuildKqlQuery(dataQuery, *TransformCtx->Tables, ctx, sysColumnsEnabled, OptimizeCtx); if (!kqlQuery) { return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index f9b3333bc29..06883ec1773 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -95,7 +95,6 @@ bool HasSchemeOrFatalIssues(const NYql::TIssues& issues); // for tests only void FailForcedNewEngineCompilationForTests(bool fail = true); -void FailForcedNewEngineExecutionForTests(bool fail = true); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index a3fc2966b59..3b2a3dd8e3d 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -34,11 +34,6 @@ using namespace NYql; using namespace NYql::NDq; using namespace NRuCalc; -static std::atomic<bool> FailForcedNewEngineExecution = false; -void FailForcedNewEngineExecutionForTests(bool fail) { - FailForcedNewEngineExecution = fail; -} - namespace { using TQueryResult = IKqpHost::TQueryResult; @@ -63,13 +58,6 @@ struct TKqpQueryState { NKqpProto::TKqpStatsCompile CompileStats; ui32 ReplyFlags = 0; bool KeepSession = false; - bool InteractiveTx = true; - - bool OldEngineFallback = false; - bool NewEngineCompatibleQuery = false; - - TKqpForceNewEngineState ForceNewEngineState; - std::optional<TQueryTraits> QueryTraits; TMaybe<NKikimrKqp::TRlPath> RlPath; }; @@ -198,13 +186,6 @@ public: Y_UNUSED(ctx); } - void HandleReady(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - - LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId) - << "Unexpected compile response while in Ready state."); - } - void HandleReady(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) { ui64 proxyRequestId = ev->Cookie; auto& event = ev->Get()->Record; @@ -295,17 +276,39 @@ public: return; } + // Most of the queries should be executed directly via session_actor switch (queryRequest.GetAction()) { - case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: - case NKikimrKqp::QUERY_ACTION_BEGIN_TX: - case NKikimrKqp::QUERY_ACTION_COMMIT_TX: - case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: + case NKikimrKqp::QUERY_ACTION_EXECUTE: + case NKikimrKqp::QUERY_ACTION_EXPLAIN: + case NKikimrKqp::QUERY_ACTION_VALIDATE: + case NKikimrKqp::QUERY_ACTION_PARSE: break; + default: - if (!queryRequest.HasType()) { - onBadRequest("Query type not specified"); + onError(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << + "Unexpected query action type in KQP worker: " << (ui32)queryRequest.GetAction()); + return; + } + + if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_EXECUTE) { + switch (queryRequest.GetType()) { + case NKikimrKqp::QUERY_TYPE_SQL_DDL: + case NKikimrKqp::QUERY_TYPE_AST_DML: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: + case NKikimrKqp::QUERY_TYPE_AST_SCAN: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: + break; + + default: + onError(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << + "Unexpected execute query type in KQP worker: " << (ui32)queryRequest.GetType()); return; - } + } + } + + if (!queryRequest.HasType()) { + onBadRequest("Query type not specified"); + return; } switch (queryRequest.GetAction()) { @@ -323,71 +326,11 @@ public: break; } - HandleQueryRequest(timer, false, ctx); + HandleQueryRequest(timer, ctx); } - void HandleQueryRequest(NCpuTime::TCpuTimer& timer, bool fallbackToOldEngine, const TActorContext& ctx) { - auto& queryRequest = QueryState->Request; - - if (fallbackToOldEngine) { - QueryState->ForceNewEngineState.ForcedNewEngine = false; - QueryState->OldEngineFallback = true; - } - - auto replyError = [this, &ctx] (NYql::EYqlIssueCode status, const TString& info) { - QueryState->AsyncQueryResult = MakeKikimrResultHolder(NCommon::ResultFromError<TQueryResult>( - YqlIssue(TPosition(), status, info))); - - ContinueQueryProcess(ctx); - Become(&TKqpWorkerActor::PerformQueryState); - }; - - if (queryRequest.HasTxControl()) { - const auto& txControl = queryRequest.GetTxControl(); - - switch (txControl.tx_selector_case()) { - case Ydb::Table::TransactionControl::kTxId: { - QueryState->TxId = txControl.tx_id(); - - auto txInfo = KqpHost->GetTransactionInfo(QueryState->TxId); - if (!txInfo) { - replyError(TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << QueryState->TxId); - return; - } - - YQL_ENSURE(!QueryState->OldEngineFallback); - QueryState->ForceNewEngineState = txInfo->ForceNewEngineState; - break; - } - - case Ydb::Table::TransactionControl::kBeginTx: { - if (txControl.commit_tx()) { - QueryState->InteractiveTx = false; - } - if (QueryState->OldEngineFallback) { - YQL_ENSURE(!QueryState->InteractiveTx); - } - break; - } - - case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: { - replyError(TIssuesIds::KIKIMR_BAD_REQUEST, TStringBuilder() << "wrong TxControl: tx_selector must be set"); - return; - } - } - } else { - // some kind of internal query? or verify here? - } - + void HandleQueryRequest(NCpuTime::TCpuTimer& timer, const TActorContext& ctx) { StopIdleTimer(ctx); - - if (CompileQuery(ctx)) { - if (QueryState) { - QueryState->CpuTime += timer.GetTime(); - } - return; - } - PerformQuery(ctx); if (QueryState) { QueryState->CpuTime += timer.GetTime(); @@ -422,105 +365,10 @@ public: } } - void HandleCompileQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) { - auto compileResult = ev->Get()->CompileResult; - - Y_VERIFY(compileResult); - Y_VERIFY(QueryState); - - if (compileResult->Status != Ydb::StatusIds::SUCCESS) { - if (ReplyQueryCompileError(compileResult, ctx)) { - StartIdleTimer(ctx); - Become(&TKqpWorkerActor::ReadyState); - } else { - FinalCleanup(ctx); - } - - return; - } - - QueryState->QueryTraits = compileResult->QueryTraits; - - if (!QueryState->ForceNewEngineState.ForcedNewEngine.has_value()) { - // first query in tx - QueryState->ForceNewEngineState.ForceNewEnginePercent = ev->Get()->ForceNewEnginePercent; - QueryState->ForceNewEngineState.ForceNewEngineLevel = ev->Get()->ForceNewEngineLevel; - } - - auto& queryRequest = QueryState->Request; - - QueryState->CompileStats.Swap(&ev->Get()->Stats); - - if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) { - if (ReplyPrepareResult(compileResult, ctx)) { - StartIdleTimer(ctx); - Become(&TKqpWorkerActor::ReadyState); - } else { - FinalCleanup(ctx); - } - - return; - } - - QueryState->QueryCompileResult = compileResult; - - switch (queryRequest.GetAction()) { - case NKikimrKqp::QUERY_ACTION_EXECUTE: - case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: - break; - - default: - Y_VERIFY_S(false, "Unexpected action on successful compile result: " - << NKikimrKqp::EQueryAction_Name(queryRequest.GetAction())); - break; - } - - NCpuTime::TCpuTimer timer; - PerformQuery(ctx); - - // PerformQuery can reset QueryState - if (QueryState) { - QueryState->CpuTime += timer.GetTime(); - } - } - - void HandleCompileQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) { - ReplyBusy(ev, ctx); - } - - void HandleCompileQuery(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - - Y_VERIFY(QueryState); - ReplyProcessError(QueryState->Sender, QueryState->ProxyRequestId, - TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION, - "Session is being closed", ctx); - - Counters->ReportWorkerClosedError(Settings.DbCounters); - FinalCleanup(ctx); - } - - void HandleCompileQuery(TEvKqp::TEvPingSessionRequest::TPtr &ev, const TActorContext &ctx) { - ui64 proxyRequestId = ev->Cookie; - ReplyPingStatus(ev->Sender, proxyRequestId, false, ctx); - } - - void HandleCompileQuery(TEvKqp::TEvIdleTimeout::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - Y_UNUSED(ctx); - } - void HandlePerformQuery(TEvKqp::TEvQueryRequest::TPtr &ev, const TActorContext &ctx) { ReplyBusy(ev, ctx); } - void HandlePerformQuery(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - - LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId) - << "Unexpected compile response while in PerformQuery state."); - } - void HandlePerformQuery(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); Y_UNUSED(ctx); @@ -543,46 +391,6 @@ public: if (ev->Get()->Finished) { QueryState->QueryResult = QueryState->AsyncQueryResult->GetResult(); QueryState->AsyncQueryResult.Reset(); - - if (!QueryState->OldEngineFallback) { - auto& x = QueryState->ForceNewEngineState; - if (x.ForcedNewEngine && *x.ForcedNewEngine) { - auto status = GetYdbStatus(QueryState->QueryResult); - bool failForTests = FailForcedNewEngineExecution.load(std::memory_order_relaxed); - - if (status != Ydb::StatusIds::SUCCESS || failForTests) { - if (x.ForceNewEngineLevel == 0 || x.ForceNewEngineLevel == 1) { - bool shouldFallback = status != Ydb::StatusIds::CANCELLED - && status != Ydb::StatusIds::ABORTED - && status != Ydb::StatusIds::OVERLOADED - && status != Ydb::StatusIds::PRECONDITION_FAILED - && status != Ydb::StatusIds::UNAVAILABLE - && status != Ydb::StatusIds::UNDETERMINED; - if (shouldFallback) { - QueryState->ForceNewEngineState = {}; - QueryState->NewEngineCompatibleQuery = false; - - GetServiceCounters(AppData()->Counters, "kqp")->GetCounter("Requests/OldEngineFallback", true)->Inc(); - - LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, "OldEngine fallback request" - << ", satus: " << Ydb::StatusIds::StatusCode_Name(status) - << ", issues: " << QueryState->QueryResult.Issues().ToString()); - - NCpuTime::TCpuTimer timer; - HandleQueryRequest(timer, true, ctx); - return; - } - } - - if (failForTests) { - QueryState->QueryResult.SetStatus(NYql::TIssuesIds::DEFAULT_ERROR); - QueryState->QueryResult.AddIssue(YqlIssue(TPosition(), TIssuesIds::DEFAULT_ERROR, "Failed for test.")); - GetServiceCounters(AppData()->Counters, "kqp")->GetCounter("Requests/ForceNewEngineExecError", true)->Inc(); - } - } - } - } - QueryCleanup(ctx); } else { NCpuTime::TCpuTimer timer(QueryState->CpuTime); @@ -618,16 +426,6 @@ public: } } - void HandlePerformCleanup(TEvKqp::TEvCompileResponse::TPtr &ev, const TActorContext &ctx) { - Y_UNUSED(ev); - - Y_VERIFY(CleanupState); - if (!CleanupState->Final) { - LOG_ERROR_S(ctx, NKikimrServices::KQP_WORKER, TKqpRequestInfo("", SessionId) - << "Unexpected compile response while in PerformCleanup state."); - } - } - void HandlePerformCleanup(TEvKqp::TEvCloseSessionRequest::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); Y_UNUSED(ctx); @@ -684,7 +482,6 @@ public: try { switch (ev->GetTypeRewrite()) { HFunc(TEvKqp::TEvQueryRequest, HandleReady); - HFunc(TEvKqp::TEvCompileResponse, HandleReady); HFunc(TEvKqp::TEvCloseSessionRequest, HandleReady); HFunc(TEvKqp::TEvPingSessionRequest, HandleReady); HFunc(TEvKqp::TEvContinueProcess, HandleReady); @@ -699,29 +496,10 @@ public: } } - STFUNC(CompileQueryState) { - try { - switch (ev->GetTypeRewrite()) { - HFunc(TEvKqp::TEvQueryRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvCompileResponse, HandleCompileQuery); - HFunc(TEvKqp::TEvCloseSessionRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvPingSessionRequest, HandleCompileQuery); - HFunc(TEvKqp::TEvIdleTimeout, HandleCompileQuery); - HFunc(TEvKqp::TEvInitiateSessionShutdown, HandleInitiateShutdown); - HFunc(TEvKqp::TEvContinueShutdown, HandleContinueShutdown); - default: - UnexpectedEvent("CompileQueryState", ev, ctx); - } - } catch (const yexception& ex) { - InternalError(ex.what(), ctx); - } - } - STFUNC(PerformQueryState) { try { switch (ev->GetTypeRewrite()) { HFunc(TEvKqp::TEvQueryRequest, HandlePerformQuery); - HFunc(TEvKqp::TEvCompileResponse, HandlePerformQuery); HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformQuery); HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformQuery); HFunc(TEvKqp::TEvContinueProcess, HandlePerformQuery); @@ -740,7 +518,6 @@ public: try { switch (ev->GetTypeRewrite()) { HFunc(TEvKqp::TEvQueryRequest, HandlePerformCleanup); - HFunc(TEvKqp::TEvCompileResponse, HandlePerformCleanup); HFunc(TEvKqp::TEvCloseSessionRequest, HandlePerformCleanup); HFunc(TEvKqp::TEvPingSessionRequest, HandlePerformCleanup); HFunc(TEvKqp::TEvContinueProcess, HandlePerformCleanup); @@ -822,64 +599,6 @@ private: return KqpHost->BeginTransaction(isolation, readonly); } - bool CompileQuery(const TActorContext& ctx) { - if (!Settings.LongSession) { - return false; - } - - Y_VERIFY(QueryState); - auto& queryRequest = QueryState->Request; - - switch (queryRequest.GetType()) { - case NKikimrKqp::QUERY_TYPE_SQL_DML: - case NKikimrKqp::QUERY_TYPE_PREPARED_DML: - break; - - default: - return false; - } - - TMaybe<TKqpQueryId> query; - TMaybe<TString> uid; - - bool keepInCache = false; - switch (queryRequest.GetAction()) { - case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false); - keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache(); - break; - - case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false); - keepInCache = true; - break; - - case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: - uid = queryRequest.GetPreparedQuery(); - keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache(); - break; - - default: - return false; - } - - if (query) { - query->Settings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType); - } - - auto compileDeadline = QueryState->QueryDeadlines.TimeoutAt; - if (QueryState->QueryDeadlines.CancelAt) { - compileDeadline = Min(compileDeadline, QueryState->QueryDeadlines.CancelAt); - } - - auto compileRequestActor = CreateKqpCompileRequestActor(ctx.SelfID, QueryState->UserToken, uid, - std::move(query), keepInCache, compileDeadline, Settings.DbCounters); - ctx.ExecutorThread.RegisterActor(compileRequestActor); - - Become(&TKqpWorkerActor::CompileQueryState); - return true; - } - void PerformQuery(const TActorContext& ctx) { Y_VERIFY(QueryState); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -944,10 +663,6 @@ private: Counters->ReportTxCreated(Settings.DbCounters); QueryState->TxId = beginTxResult.TxId; - if (commit) { - QueryState->InteractiveTx = false; - } - break; } @@ -959,13 +674,6 @@ private: auto action = queryRequest.GetAction(); auto queryType = queryRequest.GetType(); - if (QueryState->QueryCompileResult) { - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) { - Y_VERIFY(queryType == NKikimrKqp::QUERY_TYPE_SQL_DML); - queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML; - action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED; - } - } switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: { @@ -999,191 +707,8 @@ private: break; } - case NKikimrKqp::QUERY_ACTION_PREPARE: { - if (!PrepareQuery(ctx, queryRequest.GetQuery(), queryType, commit)) { - onBadRequest(QueryState->Error); - return; - } - break; - } - - case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED: { - // NOTE: For compatibility with old clients, remove once not used. - const TString& query = queryRequest.HasPreparedQuery() - ? queryRequest.GetPreparedQuery() - : queryRequest.GetQuery(); - - TPreparedQueryConstPtr preparedQuery; - - if (QueryState->QueryCompileResult) { - Y_VERIFY(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML); - - bool newEngineCompatibleTx = !QueryState->OldEngineFallback - && QueryState->ForceNewEngineState.ForceNewEnginePercent > 0; - - bool forcedOldEngine = false; - - if (newEngineCompatibleTx && - QueryState->ForceNewEngineState.ForcedNewEngine.has_value() && - QueryState->ForceNewEngineState.ForcedNewEngine.value() == false) - { - // newEngineCompatibleTx = false; - forcedOldEngine = true; - } - - QueryState->NewEngineCompatibleQuery = (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine - && newEngineCompatibleTx; - - // select engine according to deferred effects - auto effectsEngine = KqpHost->GetTransactionInfo(QueryState->TxId)->TxEngine; - - if (newEngineCompatibleTx) { - if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 0) { - if (QueryState->InteractiveTx || !QueryState->QueryTraits) { - newEngineCompatibleTx = false; - QueryState->NewEngineCompatibleQuery = false; - } else { - const auto& traits = QueryState->QueryTraits.value(); - if (!traits.ReadOnly || traits.WithJoin || traits.WithSqlIn || traits.WithIndex) { - newEngineCompatibleTx = false; - QueryState->NewEngineCompatibleQuery = false; - } - } - } else if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 1) { - if (QueryState->InteractiveTx || !QueryState->QueryTraits) { - newEngineCompatibleTx = false; - QueryState->NewEngineCompatibleQuery = false; - } else { - const auto& traits = QueryState->QueryTraits.value(); - if (!traits.ReadOnly) { - newEngineCompatibleTx = false; - QueryState->NewEngineCompatibleQuery = false; - } - } - } else if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 2) { - if (!QueryState->QueryTraits.has_value() || !QueryState->QueryTraits->ReadOnly) { - QueryState->NewEngineCompatibleQuery = false; - // but Tx is still NE Compatible, i.e. RO-queries can be executed with NewEngine - } - - if (commit) { - if (effectsEngine) { - if (*effectsEngine == TKqpTransactionInfo::EEngine::NewEngine) { - QueryState->NewEngineCompatibleQuery = true; - } else { - QueryState->NewEngineCompatibleQuery = false; - } - } else { - // Y_VERIFY(false); - } - } - } else if (QueryState->ForceNewEngineState.ForceNewEngineLevel == 3) { - newEngineCompatibleTx = true; - QueryState->NewEngineCompatibleQuery = true; - } else { - YQL_ENSURE(false); - } - } - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_WORKER, "-- NE Compatible: " - << " tx: " << newEngineCompatibleTx - << ", query: " << QueryState->NewEngineCompatibleQuery - << ", interactive: " << QueryState->InteractiveTx - << ", preparedNewEngine: " << (bool) QueryState->QueryCompileResult->PreparedQueryNewEngine - << ", effects: " << (effectsEngine ? (int) *effectsEngine : -1) - << ", forcedNewEngine: " << (QueryState->ForceNewEngineState.ForcedNewEngine - ? ToString(*QueryState->ForceNewEngineState.ForcedNewEngine) - : "<none>") - << ", traits: " << (QueryState->QueryCompileResult->QueryTraits - ? QueryState->QueryCompileResult->QueryTraits->ToString() - : "<none>") - << ", commit: " << commit - << ", text: " << queryRequest.GetQuery()); - - if (newEngineCompatibleTx && !forcedOldEngine) { - if (QueryState->NewEngineCompatibleQuery) { - if (QueryState->ForceNewEngineState.ForcedNewEngine && *QueryState->ForceNewEngineState.ForcedNewEngine) { - preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine; - LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (as part of tx)"); - } else if (!effectsEngine && QueryState->ForceNewEngineState.ForceNewEnginePercent >= RandomNumber((ui32) 100)) { - preparedQuery = QueryState->QueryCompileResult->PreparedQueryNewEngine; - QueryState->ForceNewEngineState.ForcedNewEngine = true; - - KqpHost->ForceTxNewEngine( - QueryState->TxId, - QueryState->ForceNewEngineState.ForceNewEnginePercent, - QueryState->ForceNewEngineState.ForceNewEngineLevel - ); - LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force NewEngine query execution (new tx)"); - } else { - QueryState->ForceNewEngineState.ForcedNewEngine = false; - KqpHost->ForceTxOldEngine(QueryState->TxId); - preparedQuery = QueryState->QueryCompileResult->PreparedQuery; - LOG_INFO_S(ctx, NKikimrServices::KQP_WORKER, "Force OldEngine query execution (new tx)"); - } - } else { - if (!QueryState->ForceNewEngineState.ForcedNewEngine.has_value()) { - QueryState->ForceNewEngineState.ForcedNewEngine = false; - KqpHost->ForceTxOldEngine(QueryState->TxId); - } - - preparedQuery = QueryState->QueryCompileResult->PreparedQuery; - } - } else { - preparedQuery = QueryState->QueryCompileResult->PreparedQuery; - } - } else if (Settings.LongSession) { - onError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Prepared query not found: " << query); - return; - } else { - NKikimrKqp::TPreparedQuery tmp; - if (!tmp.ParseFromString(query)) { - onBadRequest("Failed to parse prepared query."); - return; - } - preparedQuery = std::make_shared<const NKikimrKqp::TPreparedQuery>(std::move(tmp)); - } - - YQL_ENSURE(preparedQuery); - - QueryState->Request.SetQuery(preparedQuery->GetText()); - - if (!ExecutePreparedQuery(preparedQuery, queryType, std::move(*QueryState->Request.MutableParameters()), - commit, GetStatsMode(queryRequest, EKikimrStatsMode::Basic))) - { - onBadRequest(QueryState->Error); - return; - } - break; - } - - case NKikimrKqp::QUERY_ACTION_BEGIN_TX: { - TQueryResult result; - result.SetSuccess(); - QueryState->AsyncQueryResult = MakeKikimrResultHolder(std::move(result)); - ContinueQueryProcess(ctx); - Become(&TKqpWorkerActor::PerformQueryState); - return; - } - - case NKikimrKqp::QUERY_ACTION_COMMIT_TX: { - if (!CommitTx(commit, GetStatsMode(queryRequest, EKikimrStatsMode::Basic))) { - onBadRequest(QueryState->Error); - return; - } - break; - } - - case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: { - if (!RollbackTx(commit)) { - onBadRequest(QueryState->Error); - return; - } - break; - } - default: { - onBadRequest("Unknown query action"); + onBadRequest(TStringBuilder() << "Unknown query action: " << (ui32)queryRequest.GetAction()); return; } } @@ -1273,7 +798,6 @@ private: auto statsMode = GetStatsMode(queryRequest, EKikimrStatsMode::Basic); switch (type) { - case NKikimrKqp::QUERY_TYPE_SQL_DML: case NKikimrKqp::QUERY_TYPE_AST_DML: { bool isSql = (type == NKikimrKqp::QUERY_TYPE_SQL_DML); @@ -1283,7 +807,6 @@ private: execSettings.Deadlines = QueryState->QueryDeadlines; execSettings.Limits = GetQueryLimits(Settings); execSettings.StrictDml = false; - execSettings.UseNewEngine = UseNewEngine(); execSettings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType); QueryState->AsyncQueryResult = KqpHost->ExecuteDataQuery(QueryState->TxId, query, isSql, @@ -1313,7 +836,6 @@ private: break; } - case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_SCAN: { bool isSql = (type == NKikimrKqp::QUERY_TYPE_SQL_SCAN); @@ -1329,7 +851,7 @@ private: } default: { - QueryState->Error = "Unexpected query type."; + QueryState->Error = TStringBuilder() << "Unexpected query type: " << (ui32)type; return false; } } @@ -1393,7 +915,6 @@ private: switch (type) { case NKikimrKqp::QUERY_TYPE_SQL_DML: { IKqpHost::TPrepareSettings prepareSettings; - // prepareSettings.UseNewEngine = use default settings prepareSettings.DocumentApiRestricted = IsDocumentApiRestricted(QueryState->RequestType); QueryState->AsyncQueryResult = KqpHost->PrepareDataQuery(query, prepareSettings); break; @@ -1426,41 +947,6 @@ private: } } - bool CommitTx(bool commit, EKikimrStatsMode statsMode) { - if (!commit) { - QueryState->Error = "Commit should be true for commit transaction request."; - return false; - } - if (QueryState->TxId.empty()) { - QueryState->Error = "Empty tx_id for commit transaction request."; - return false; - } - - IKikimrQueryExecutor::TExecuteSettings execSettings; - execSettings.CommitTx = true; - execSettings.StatsMode = statsMode; - execSettings.Deadlines = QueryState->QueryDeadlines; - execSettings.Limits = GetQueryLimits(Settings); - execSettings.UseNewEngine = UseNewEngine(); - - QueryState->AsyncQueryResult = KqpHost->CommitTransaction(QueryState->TxId, execSettings); - return true; - } - - bool RollbackTx(bool commit) { - if (commit) { - QueryState->Error = "Commit should be false for rollback transaction request."; - return false; - } - if (QueryState->TxId.empty()) { - QueryState->Error = "Empty tx_id for rollback transaction request."; - return false; - } - - QueryState->AsyncQueryResult = KqpHost->RollbackTransaction(QueryState->TxId, CreateRollbackSettings()); - return true; - } - void ContinueQueryProcess(const TActorContext &ctx) { Y_VERIFY(QueryState); @@ -1609,22 +1095,6 @@ private: Counters->ReportQueryReadSets(Settings.DbCounters, queryResult.QueryStats.GetReadSetsCount()); Counters->ReportQueryMaxShardReplySize(Settings.DbCounters, queryResult.QueryStats.GetMaxShardReplySize()); Counters->ReportQueryMaxShardProgramSize(Settings.DbCounters, queryResult.QueryStats.GetMaxShardProgramSize()); - - if (QueryState->QueryCompileResult && QueryState->QueryCompileResult->PreparedQueryNewEngine - && QueryState->NewEngineCompatibleQuery) - { - ui64 computeCpuTimeUs = 0; - for (auto& execution : queryResult.QueryStats.GetExecutions()) { - computeCpuTimeUs += execution.GetCpuTimeUs(); - } - - // query can be executed with NewEngine - if (QueryState->ForceNewEngineState.ForcedNewEngine && *QueryState->ForceNewEngineState.ForcedNewEngine) { - Counters->ReportNewEngineForcedQueryStats(queryRequest.GetAction(), queryDuration, computeCpuTimeUs); - } else { - Counters->ReportNewEngineCompatibleQueryStats(queryRequest.GetAction(), queryDuration, computeCpuTimeUs); - } - } } if (queryResult.SqlVersion) { @@ -2022,23 +1492,10 @@ private: IKikimrQueryExecutor::TExecuteSettings settings; settings.RollbackTx = true; settings.Deadlines.TimeoutAt = TInstant::Now() + TDuration::Minutes(1); - settings.UseNewEngine = UseNewEngine(); return settings; } - TMaybe<bool> UseNewEngine() const { - YQL_ENSURE(QueryState); - - if (auto txInfo = KqpHost->GetTransactionInfo(QueryState->TxId)) { - if (auto engine = txInfo->TxEngine; engine.has_value()) { - return *engine == TKqpTransactionInfo::EEngine::NewEngine; - } - } - - return Nothing(); - } - static TKikimrQueryLimits GetQueryLimits(const TKqpWorkerSettings& settings) { const auto& queryLimitsProto = settings.Service.GetQueryLimits(); const auto& phaseLimitsProto = queryLimitsProto.GetPhaseLimits(); diff --git a/ydb/core/kqp/ut/CMakeLists.darwin.txt b/ydb/core/kqp/ut/CMakeLists.darwin.txt index 38e9bd0e8f2..099ce198dd0 100644 --- a/ydb/core/kqp/ut/CMakeLists.darwin.txt +++ b/ydb/core/kqp/ut/CMakeLists.darwin.txt @@ -7,7 +7,6 @@ add_subdirectory(common) -add_subdirectory(fat) add_subdirectory(pg) add_subdirectory(spilling) diff --git a/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt index 904a04cfa33..4243348b290 100644 --- a/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/CMakeLists.linux-aarch64.txt @@ -7,7 +7,6 @@ add_subdirectory(common) -add_subdirectory(fat) add_subdirectory(pg) add_subdirectory(spilling) diff --git a/ydb/core/kqp/ut/CMakeLists.linux.txt b/ydb/core/kqp/ut/CMakeLists.linux.txt index 6db7d4496db..9e2c302fcb6 100644 --- a/ydb/core/kqp/ut/CMakeLists.linux.txt +++ b/ydb/core/kqp/ut/CMakeLists.linux.txt @@ -7,7 +7,6 @@ add_subdirectory(common) -add_subdirectory(fat) add_subdirectory(pg) add_subdirectory(spilling) diff --git a/ydb/core/kqp/ut/fat/CMakeLists.darwin.txt b/ydb/core/kqp/ut/fat/CMakeLists.darwin.txt deleted file mode 100644 index bfc9ff61e44..00000000000 --- a/ydb/core/kqp/ut/fat/CMakeLists.darwin.txt +++ /dev/null @@ -1,52 +0,0 @@ - -# This file was gererated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_executable(ydb-core-kqp-ut-fat) -target_compile_options(ydb-core-kqp-ut-fat PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) -target_include_directories(ydb-core-kqp-ut-fat PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp -) -target_link_libraries(ydb-core-kqp-ut-fat PUBLIC - contrib-libs-cxxsupp - yutil - library-cpp-cpuid_check - cpp-testing-unittest_main - ydb-core-kqp - cpp-client-ydb_proto - core-kqp-counters - core-kqp-host - core-kqp-provider - kqp-ut-common - yql-sql-pg_dummy -) -target_link_options(ydb-core-kqp-ut-fat PRIVATE - -Wl,-no_deduplicate - -Wl,-sdk_version,10.15 - -fPIC - -fPIC - -framework - CoreFoundation -) -target_sources(ydb-core-kqp-ut-fat PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp -) -add_test( - NAME - ydb-core-kqp-ut-fat - COMMAND - ydb-core-kqp-ut-fat - --print-before-suite - --print-before-test - --fork-tests - --print-times - --show-fails -) -vcs_info(ydb-core-kqp-ut-fat) diff --git a/ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt deleted file mode 100644 index 41a5a981fcf..00000000000 --- a/ydb/core/kqp/ut/fat/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,54 +0,0 @@ - -# This file was gererated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_executable(ydb-core-kqp-ut-fat) -target_compile_options(ydb-core-kqp-ut-fat PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) -target_include_directories(ydb-core-kqp-ut-fat PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp -) -target_link_libraries(ydb-core-kqp-ut-fat PUBLIC - contrib-libs-cxxsupp - yutil - library-cpp-lfalloc - cpp-testing-unittest_main - ydb-core-kqp - cpp-client-ydb_proto - core-kqp-counters - core-kqp-host - core-kqp-provider - kqp-ut-common - yql-sql-pg_dummy -) -target_link_options(ydb-core-kqp-ut-fat PRIVATE - -ldl - -lrt - -Wl,--no-as-needed - -fPIC - -fPIC - -lpthread - -lrt - -ldl -) -target_sources(ydb-core-kqp-ut-fat PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp -) -add_test( - NAME - ydb-core-kqp-ut-fat - COMMAND - ydb-core-kqp-ut-fat - --print-before-suite - --print-before-test - --fork-tests - --print-times - --show-fails -) -vcs_info(ydb-core-kqp-ut-fat) diff --git a/ydb/core/kqp/ut/fat/CMakeLists.linux.txt b/ydb/core/kqp/ut/fat/CMakeLists.linux.txt deleted file mode 100644 index 48c75b49f72..00000000000 --- a/ydb/core/kqp/ut/fat/CMakeLists.linux.txt +++ /dev/null @@ -1,56 +0,0 @@ - -# This file was gererated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_executable(ydb-core-kqp-ut-fat) -target_compile_options(ydb-core-kqp-ut-fat PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) -target_include_directories(ydb-core-kqp-ut-fat PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp -) -target_link_libraries(ydb-core-kqp-ut-fat PUBLIC - contrib-libs-cxxsupp - yutil - cpp-malloc-tcmalloc - libs-tcmalloc-no_percpu_cache - library-cpp-cpuid_check - cpp-testing-unittest_main - ydb-core-kqp - cpp-client-ydb_proto - core-kqp-counters - core-kqp-host - core-kqp-provider - kqp-ut-common - yql-sql-pg_dummy -) -target_link_options(ydb-core-kqp-ut-fat PRIVATE - -ldl - -lrt - -Wl,--no-as-needed - -fPIC - -fPIC - -lpthread - -lrt - -ldl -) -target_sources(ydb-core-kqp-ut-fat PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp -) -add_test( - NAME - ydb-core-kqp-ut-fat - COMMAND - ydb-core-kqp-ut-fat - --print-before-suite - --print-before-test - --fork-tests - --print-times - --show-fails -) -vcs_info(ydb-core-kqp-ut-fat) diff --git a/ydb/core/kqp/ut/fat/CMakeLists.txt b/ydb/core/kqp/ut/fat/CMakeLists.txt deleted file mode 100644 index 3e0811fb22e..00000000000 --- a/ydb/core/kqp/ut/fat/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was gererated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) - include(CMakeLists.linux-aarch64.txt) -elseif (APPLE) - include(CMakeLists.darwin.txt) -elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) - include(CMakeLists.linux.txt) -endif() diff --git a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp b/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp deleted file mode 100644 index eb0479aa45c..00000000000 --- a/ydb/core/kqp/ut/fat/kqp_force_newengine_ut.cpp +++ /dev/null @@ -1,1042 +0,0 @@ -#include <ydb/core/kqp/ut/common/kqp_ut_common.h> - -#include <ydb/core/base/counters.h> -#include <ydb/core/kqp/counters/kqp_counters.h> -#include <ydb/core/kqp/kqp_impl.h> - -#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> - -#include <library/cpp/grpc/client/grpc_client_low.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYdb; -using namespace NYdb::NTable; - -class KqpForceNewEngine : public TTestBase { -public: - void SetUp() override { - FailForcedNewEngineCompilationForTests(false); - FailForcedNewEngineExecutionForTests(false); - - TVector<NKikimrKqp::TKqpSetting> settings; - NKikimrKqp::TKqpSetting setting; - setting.SetName("_KqpForceNewEngine"); - setting.SetValue("false"); - settings.push_back(setting); - - TKikimrRunner kikimr(settings); - - Kikimr.reset(new TKikimrRunner(settings)); - Counters = Kikimr->GetTestServer().GetRuntime()->GetAppData(0).Counters; - KqpCounters.reset(new TKqpCounters(Counters)); - } - - void TearDown() override { - FailForcedNewEngineCompilationForTests(false); - FailForcedNewEngineExecutionForTests(false); - } - - TSession Session() { - return Kikimr->GetTableClient().CreateSession().GetValueSync().GetSession(); - } - - void ForceNewEngine(ui32 percent, ui32 level) { - NGrpc::TGRpcClientLow grpcClient; - auto grpcContext = grpcClient.CreateContext(); - - NGrpc::TGRpcClientConfig grpcConfig(Kikimr->GetEndpoint()); - auto grpc = grpcClient.CreateGRpcServiceConnection<NKikimrClient::TGRpcServer>(grpcConfig); - - NKikimrClient::TConsoleRequest request; - auto* action = request.MutableConfigureRequest()->MutableActions()->Add(); - auto* configItem = action->MutableAddConfigItem()->MutableConfigItem(); - configItem->SetKind(NKikimrConsole::TConfigItem::TableServiceConfigItem); - - configItem->MutableConfig()->MutableTableServiceConfig()->SetForceNewEnginePercent(percent); - configItem->MutableConfig()->MutableTableServiceConfig()->SetForceNewEngineLevel(level); - - std::atomic<int> done = 0; - grpc->DoRequest<NKikimrClient::TConsoleRequest, NKikimrClient::TConsoleResponse>( - request, - [&done](NGrpc::TGrpcStatus&& status, NKikimrClient::TConsoleResponse&& response) { - if (status.Ok()) { - if (response.GetStatus().code() != Ydb::StatusIds::SUCCESS) { - done = 3; - } else if (response.GetConfigureResponse().GetStatus().code() != Ydb::StatusIds::SUCCESS) { - done = 4; - } else { - done = 1; - } - } else { - Cerr << "status: " << status.Msg << ", " << status.InternalError << ", " << status.GRpcStatusCode << Endl; - Cerr << response.DebugString() << Endl; - done = 2; - } - }, - &NKikimrClient::TGRpcServer::Stub::AsyncConsoleRequest, - {}, - grpcContext.get()); - - while (done.load() == 0) { - ::Sleep(TDuration::Seconds(1)); - } - grpcContext.reset(); - grpcClient.Stop(true); - - UNIT_ASSERT_VALUES_EQUAL(done.load(), 1); - } - - void TestNotInteractiveReadOnlyTx(ui32 level, bool withSqlIn = false) { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto query = withSqlIn - ? R"( - --!syntax_v1 - DECLARE $values AS List<String>; - SELECT * FROM `/Root/TwoShard` WHERE Value1 IN $values - )" - : R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 1 - )"; - auto params = TParamsBuilder() - .AddParam("$values").BeginList().AddListItem().String("One").EndList().Build().Build(); - auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0))); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(50, level); - test(20); - bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0; - if (level == 0 && withSqlIn) { - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } else { - UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val() - << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - } - - { - ForceNewEngine(100, level); - test(2); - if (level == 0 && withSqlIn) { - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - } else { - UNIT_ASSERT_VALUES_EQUAL(2, KqpCounters->NewEngineForcedQueryCount->Val()); - } - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(0, level); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - } - - void TestNotInteractiveWriteOnlyTx(ui32 level) { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne") - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, level); - test(2); - UNIT_ASSERT_VALUES_EQUAL(level == 3 ? 2 : 0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - } - - void TestNotInteractiveReadOnlyTxFailedNECompilation(ui32 level) { - auto session = Session(); - - auto test = [&]() { - KqpCounters->ForceNewEngineCompileErrors->Set(0); - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/KeyValue` LIMIT 1 - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - }; - - { - ForceNewEngine(100, level); - FailForcedNewEngineCompilationForTests(); - test(); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - } - - void TestNotInteractiveReadOnlyTxFallback(ui32 level) { - auto session = Session(); - - auto test = [&]() { - KqpCounters->ForceNewEngineCompileErrors->Set(0); - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/KeyValue` LIMIT 1 - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - if (level == 0 || level == 1) { - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } else { - UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString()); - } - }; - - { - ForceNewEngine(100, level); - FailForcedNewEngineExecutionForTests(); - test(); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); // failed request is not counted - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(level == 0 || level == 1 ? 1 : 0, GetServiceCounters(Counters, "kqp")->GetCounter("Requests/OldEngineFallback", true)->Val()); - UNIT_ASSERT_VALUES_EQUAL(level == 0 || level == 1 ? 0 : 1, GetServiceCounters(Counters, "kqp")->GetCounter("Requests/ForceNewEngineExecError", true)->Val()); - } - } - - void TestInteractiveReadWriteEx(ui32 level, bool addBeginTx, /* bool prependWrite, */ bool appendRead, bool addCommitTx) { - UNIT_ASSERT(level == 2); - - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto txControl = TTxControl::BeginTx(); - - if (addBeginTx) { - auto tx = session.BeginTransaction(TTxSettings::SerializableRW()).ExtractValueSync().GetTransaction(); - UNIT_ASSERT(tx.IsActive()); - - txControl = TTxControl::Tx(tx); - } - - // TODO: prependWrite - - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/KeyValue` LIMIT 1 - )", txControl).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - txControl = TTxControl::Tx(*result.GetTransaction()); - - if (appendRead || addCommitTx) { - // do nothing - } else { - txControl = txControl.CommitTx(); - } - - result = session.ExecuteDataQuery(Sprintf(R"( - UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1 - )", i), txControl).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - if (appendRead) { - txControl = TTxControl::Tx(*result.GetTransaction()); - if (!addCommitTx) { - txControl = txControl.CommitTx(); - } - - result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/KeyValue` LIMIT 2 - )", txControl).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - if (addCommitTx) { - auto txResult = result.GetTransaction()->Commit().ExtractValueSync(); - UNIT_ASSERT_C(txResult.IsSuccess(), txResult.GetIssues().ToString()); - } - } else { - if (addCommitTx) { - auto txResult = result.GetTransaction()->Commit().ExtractValueSync(); - UNIT_ASSERT_C(txResult.IsSuccess(), txResult.GetIssues().ToString()); - } - } - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(50, level); - test(20); - bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0; - UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val() - << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, level); - test(2); - UNIT_ASSERT_VALUES_EQUAL(2 * (appendRead && addCommitTx ? 2 : 1), KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(0, level); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - } - - - UNIT_TEST_SUITE(KqpForceNewEngine); - UNIT_TEST(Level0_NotInteractiveReadOnly); - UNIT_TEST(Level0_NotInteractiveReadOnlySqlIn); - UNIT_TEST(Level0_NotInteractiveWriteOnly); - UNIT_TEST(Level0_NotInteractiveReadWrite); - UNIT_TEST(Level0_InteractiveReadOnly); - UNIT_TEST(Level0_CompilationFailure); - UNIT_TEST(Level0_Fallback); - - UNIT_TEST(Level1_NotInteractiveReadOnly); - UNIT_TEST(Level1_NotInteractiveReadOnlySqlIn); - UNIT_TEST(Level1_NotInteractiveWriteOnly); - UNIT_TEST(Level1_NotInteractiveReadWrite); - UNIT_TEST(Level1_InteractiveReadOnly); - UNIT_TEST(Level1_CompilationFailure); - UNIT_TEST(Level1_Fallback); - - UNIT_TEST(Level2_NotInteractiveReadOnly); - UNIT_TEST(Level2_NotInteractiveWriteOnly); - UNIT_TEST(Level2_InteractiveReadOnly); - UNIT_TEST(Level2_InteractiveReadWrite); - UNIT_TEST(Level2_InteractiveBeginReadWrite); - UNIT_TEST(Level2_InteractiveReadWriteCommit); - UNIT_TEST(Level2_InteractiveBeginReadWriteCommit); - UNIT_TEST(Level2_InteractiveReadWriteRead); - UNIT_TEST(Level2_InteractiveBeginReadWriteRead); - UNIT_TEST(Level2_InteractiveReadWriteReadCommit); - UNIT_TEST(Level2_InteractiveBeginReadWriteReadCommit); - UNIT_TEST(Level2_InteractiveWriteOnly); - UNIT_TEST(Level2_CompilationFailure); - UNIT_TEST(Level2_NoFallback); - UNIT_TEST(Level2_ActiveRequestRead); - UNIT_TEST(Level2_ActiveRequestWrite); - - UNIT_TEST(Level3_NotInteractiveReadOnly); - UNIT_TEST(Level3_NotInteractiveWriteOnly); - UNIT_TEST(Level3_InteractiveReadOnly); - UNIT_TEST(Level3_InteractiveReadWrite); - UNIT_TEST(Level3_CompilationFailure); - UNIT_TEST(Level3_NoFallback); - UNIT_TEST(Level3_ActiveRequestRead); - UNIT_TEST(Level3_ActiveRequestWrite); - UNIT_TEST_SUITE_END(); - - void Level0_NotInteractiveReadOnly(); - void Level0_NotInteractiveReadOnlySqlIn(); - void Level0_NotInteractiveWriteOnly(); - void Level0_NotInteractiveReadWrite(); - void Level0_InteractiveReadOnly(); - void Level0_CompilationFailure(); - void Level0_Fallback(); - - void Level1_NotInteractiveReadOnly(); - void Level1_NotInteractiveReadOnlySqlIn(); - void Level1_NotInteractiveWriteOnly(); - void Level1_NotInteractiveReadWrite(); - void Level1_InteractiveReadOnly(); - void Level1_CompilationFailure(); - void Level1_Fallback(); - - void Level2_NotInteractiveReadOnly(); - void Level2_NotInteractiveWriteOnly(); - void Level2_InteractiveReadOnly(); - void Level2_InteractiveReadWrite(); - void Level2_InteractiveBeginReadWrite(); - void Level2_InteractiveReadWriteCommit(); - void Level2_InteractiveBeginReadWriteCommit(); - void Level2_InteractiveReadWriteRead(); - void Level2_InteractiveBeginReadWriteRead(); - void Level2_InteractiveReadWriteReadCommit(); - void Level2_InteractiveBeginReadWriteReadCommit(); - void Level2_InteractiveWriteOnly(); - void Level2_CompilationFailure(); - void Level2_NoFallback(); - void Level2_ActiveRequestRead(); - void Level2_ActiveRequestWrite(); - - void Level3_NotInteractiveReadOnly(); - void Level3_NotInteractiveWriteOnly(); - void Level3_InteractiveReadOnly(); - void Level3_InteractiveReadWrite(); - void Level3_CompilationFailure(); - void Level3_NoFallback(); - void Level3_ActiveRequestRead(); - void Level3_ActiveRequestWrite(); - -private: - std::unique_ptr<TKikimrRunner> Kikimr; - ::NMonitoring::TDynamicCounterPtr Counters; - std::unique_ptr<TKqpCounters> KqpCounters; -}; -UNIT_TEST_SUITE_REGISTRATION(KqpForceNewEngine); - -/////////// LEVEL 0 //////////////////////////////////////////////////////////////////////////////////////////////////// -void KqpForceNewEngine::Level0_NotInteractiveReadOnly() { - TestNotInteractiveReadOnlyTx(0); -} - -void KqpForceNewEngine::Level0_NotInteractiveReadOnlySqlIn() { - TestNotInteractiveReadOnlyTx(0, /* withSqlIn */ true); -} - -void KqpForceNewEngine::Level0_NotInteractiveWriteOnly() { - TestNotInteractiveWriteOnlyTx(0); -} - -void KqpForceNewEngine::Level0_NotInteractiveReadWrite() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(Sprintf(R"( - UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1 - )", i), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 0); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level0_InteractiveReadOnly() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0))); - - auto tx = *result.GetTransaction(); - - result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 2 - )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0))); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 0); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level0_CompilationFailure() { - TestNotInteractiveReadOnlyTxFailedNECompilation(0); -} - -void KqpForceNewEngine::Level0_Fallback() { - TestNotInteractiveReadOnlyTxFallback(0); -} - -/////////// LEVEL 1 //////////////////////////////////////////////////////////////////////////////////////////////////// -void KqpForceNewEngine::Level1_NotInteractiveReadOnly() { - TestNotInteractiveReadOnlyTx(1); -} - -void KqpForceNewEngine::Level1_NotInteractiveReadOnlySqlIn() { - TestNotInteractiveReadOnlyTx(1, /* withSqlIn */ true); -} - -void KqpForceNewEngine::Level1_NotInteractiveWriteOnly() { - TestNotInteractiveWriteOnlyTx(1); -} - -void KqpForceNewEngine::Level1_NotInteractiveReadWrite() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(Sprintf(R"( - UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1 - )", i), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 1); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level1_InteractiveReadOnly() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0))); - - auto tx = *result.GetTransaction(); - - result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 2 - )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0))); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 1); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level1_CompilationFailure() { - TestNotInteractiveReadOnlyTxFailedNECompilation(1); -} - -void KqpForceNewEngine::Level1_Fallback() { - TestNotInteractiveReadOnlyTxFallback(1); -} - -/////////// LEVEL 2 //////////////////////////////////////////////////////////////////////////////////////////////////// -void KqpForceNewEngine::Level2_NotInteractiveReadOnly() { - TestNotInteractiveReadOnlyTx(2); -} - -void KqpForceNewEngine::Level2_NotInteractiveWriteOnly() { - TestNotInteractiveWriteOnlyTx(2); -} - -void KqpForceNewEngine::Level2_InteractiveReadOnly() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0))); - - auto tx = *result.GetTransaction(); - - result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 2 - )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0))); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(50, 2); - test(20); - bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0; - UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val() - << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 2); - test(2); - UNIT_ASSERT_VALUES_EQUAL(4, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(0, 2); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level2_InteractiveReadWrite() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ false, /* addCommitTx */ false); -} - -void KqpForceNewEngine::Level2_InteractiveBeginReadWrite() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ false, /* addCommitTx */ false); -} - -void KqpForceNewEngine::Level2_InteractiveReadWriteCommit() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ false, /* addCommitTx */ true); -} - -void KqpForceNewEngine::Level2_InteractiveBeginReadWriteCommit() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ false, /* addCommitTx */ true); -} - -void KqpForceNewEngine::Level2_InteractiveReadWriteRead() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ true, /* addCommitTx */ false); -} - -void KqpForceNewEngine::Level2_InteractiveBeginReadWriteRead() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ true, /* addCommitTx */ false); -} - -void KqpForceNewEngine::Level2_InteractiveReadWriteReadCommit() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ false, /* appendRead */ true, /* addCommitTx */ true); -} - -void KqpForceNewEngine::Level2_InteractiveBeginReadWriteReadCommit() { - TestInteractiveReadWriteEx(/* level */ 2, /* addBeginTx */ true, /* appendRead */ true, /* addCommitTx */ true); -} - -void KqpForceNewEngine::Level2_InteractiveWriteOnly() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne") - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - result = session.ExecuteDataQuery(R"( - REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (1, "OneOne") - )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 2); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level2_CompilationFailure() { - TestNotInteractiveReadOnlyTxFailedNECompilation(2); -} - -void KqpForceNewEngine::Level2_NoFallback() { - TestNotInteractiveReadOnlyTxFallback(2); -} - -void KqpForceNewEngine::Level2_ActiveRequestRead() { - auto session = Session(); - - // start request with OldEngine - - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` LIMIT 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - ForceNewEngine(100, 2); - - // can switch to NewEngine (RO-query, no deferred effects) - - result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` LIMIT 1 - )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - result = session.ExecuteDataQuery(R"( - SELECT 42 - )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(2, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); -} - -void KqpForceNewEngine::Level2_ActiveRequestWrite() { - auto session = Session(); - - // start query with OldEngine - - auto result = session.ExecuteDataQuery(R"( - REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne") - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - ForceNewEngine(100, 2); - - // have deferred effects, so dont force new engine on active transactions - - result = session.ExecuteDataQuery(R"( - SELECT 42 - )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - - result = session.ExecuteDataQuery(R"( - SELECT 42 - )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); -} - -/////////// LEVEL 3 //////////////////////////////////////////////////////////////////////////////////////////////////// -void KqpForceNewEngine::Level3_NotInteractiveReadOnly() { - TestNotInteractiveReadOnlyTx(3); -} - -void KqpForceNewEngine::Level3_NotInteractiveWriteOnly() { - TestNotInteractiveWriteOnlyTx(3); -} - -void KqpForceNewEngine::Level3_InteractiveReadOnly() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[1u];["One"];[-1]]])", FormatResultSetYson(result.GetResultSet(0))); - - auto tx = *result.GetTransaction(); - - result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` WHERE Key = 2 - )", TTxControl::Tx(tx).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[[2u];["Two"];[0]]])", FormatResultSetYson(result.GetResultSet(0))); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(50, 3); - test(20); - bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0; - UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val() - << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 3); - test(2); - UNIT_ASSERT_VALUES_EQUAL(4, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(0, 3); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level3_InteractiveReadWrite() { - auto session = Session(); - - auto test = [&](ui32 count) { - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - - for (ui32 i = 0; i < count; ++i) { - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/KeyValue` LIMIT 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - result = session.ExecuteDataQuery(Sprintf(R"( - UPDATE `/Root/TwoShard` SET Value2 = %d WHERE Key = 1 - )", i), TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - }; - - { - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(50, 3); - test(20); - bool diff = KqpCounters->NewEngineForcedQueryCount->Val() * KqpCounters->NewEngineCompatibleQueryCount->Val() != 0; - UNIT_ASSERT_C(diff, "forced: " << KqpCounters->NewEngineForcedQueryCount->Val() - << ", compatible: " << KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(100, 3); - test(2); - UNIT_ASSERT_VALUES_EQUAL(4, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } - - { - ForceNewEngine(0, 3); - test(2); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - } -} - -void KqpForceNewEngine::Level3_CompilationFailure() { - TestNotInteractiveReadOnlyTxFailedNECompilation(3); -} - -void KqpForceNewEngine::Level3_NoFallback() { - TestNotInteractiveReadOnlyTxFallback(3); -} - -void KqpForceNewEngine::Level3_ActiveRequestRead() { - auto session = Session(); - - ForceNewEngine(1, 2); - - // start request with forced OldEngine - TMaybe<TTransaction> tx; - while (!tx) { - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/TwoShard` LIMIT 1 - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - - if (KqpCounters->NewEngineForcedQueryCount->Val() == 1) { - // reroll :-) - - tx->Rollback().GetValueSync(); - session = Session(); - - KqpCounters->NewEngineForcedQueryCount->Set(0); - KqpCounters->NewEngineCompatibleQueryCount->Set(0); - continue; - } - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - tx = result.GetTransaction(); - } - - ForceNewEngine(100, 3); - - // dont change to NewEngine - - auto result = session.ExecuteDataQuery(R"( - REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne") - )", TTxControl::Tx(*tx)).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - result = session.ExecuteDataQuery(R"( - SELECT 42 - )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(1, KqpCounters->NewEngineCompatibleQueryCount->Val()); -} - -void KqpForceNewEngine::Level3_ActiveRequestWrite() { - auto session = Session(); - - ForceNewEngine(100, 2); - - // start request with forced OldEngine - auto result = session.ExecuteDataQuery(R"( - REPLACE INTO `/Root/TwoShard` (Key, Value1) VALUES (1, "OneOne") - )", TTxControl::BeginTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); - - ForceNewEngine(100, 3); - - // dont switch to NewEngine (have deferred effects) - - result = session.ExecuteDataQuery(R"( - SELECT 42 - )", TTxControl::Tx(*result.GetTransaction())).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - result = session.ExecuteDataQuery(R"( - SELECT 42 - )", TTxControl::Tx(*result.GetTransaction()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->ForceNewEngineCompileErrors->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineForcedQueryCount->Val()); - UNIT_ASSERT_VALUES_EQUAL(0, KqpCounters->NewEngineCompatibleQueryCount->Val()); -} - -} // namespace NKqp -} // namespace NKikimr |