diff options
author | spuchin <spuchin@ydb.tech> | 2022-11-03 18:54:45 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-11-03 18:54:45 +0300 |
commit | de0d350a9fcaa1df3682891fbada77ff32c92e5a (patch) | |
tree | 3cb6ebe7b2197d7d12035d39285b70362b79655f | |
parent | fa02a88cc4bc0b204f32604325af59b1bb1439c2 (diff) | |
download | ydb-de0d350a9fcaa1df3682891fbada77ff32c92e5a.tar.gz |
Remove OldEngine prepare/execution code. ()
24 files changed, 89 insertions, 3750 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h index 196f6098281..91379803825 100644 --- a/ydb/core/kqp/common/kqp_gateway.h +++ b/ydb/core/kqp/common/kqp_gateway.h @@ -29,41 +29,6 @@ struct TKqpParamsMap { class IKqpGateway : public NYql::IKikimrGateway { public: - struct TMkqlResult : public TGenericResult { - TString CompiledProgram; - NKikimrMiniKQL::TResult Result; - NKikimrQueryStats::TTxStats TxStats; - }; - - struct TMkqlSettings { - bool LlvmRuntime = false; - bool CollectStats = false; - TMaybe<ui64> PerShardKeysSizeLimitBytes; - ui64 CancelAfterMs = 0; - ui64 TimeoutMs = 0; - NYql::TKikimrQueryPhaseLimits Limits; - }; - - struct TRunResponse { - bool HasProxyError; - ui32 ProxyStatus; - TString ProxyStatusName; - TString ProxyStatusDesc; - - bool HasExecutionEngineError; - TString ExecutionEngineStatusName; - TString ExecutionEngineStatusDesc; - - NYql::TIssues Issues; - - TString MiniKQLErrors; - TString DataShardErrors; - NKikimrTxUserProxy::TMiniKQLCompileResults MiniKQLCompileResults; - - NKikimrMiniKQL::TResult ExecutionEngineEvaluatedResponse; - NKikimrQueryStats::TTxStats TxStats; - }; - struct TPhysicalTxData : private TMoveOnly { std::shared_ptr<const NKqpProto::TKqpPhyTx> Body; TKqpParamsMap Params; @@ -146,15 +111,6 @@ public: }; public: - /* Mkql */ - virtual NThreading::TFuture<TMkqlResult> ExecuteMkql(const TString& cluster, const TString& program, - TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) = 0; - - virtual NThreading::TFuture<TMkqlResult> ExecuteMkqlPrepared(const TString& cluster, const TString& program, - TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) = 0; - - virtual NThreading::TFuture<TMkqlResult> PrepareMkql(const TString& cluster, const TString& program) = 0; - /* Snapshots */ virtual NThreading::TFuture<TKqpSnapshotHandle> CreatePersistentSnapshot(const TVector<TString>& tablePaths, TDuration queryTimeout) = 0; diff --git a/ydb/core/kqp/host/CMakeLists.txt b/ydb/core/kqp/host/CMakeLists.txt index 5324cac490b..d0e18d46391 100644 --- a/ydb/core/kqp/host/CMakeLists.txt +++ b/ydb/core/kqp/host/CMakeLists.txt @@ -35,7 +35,6 @@ target_sources(core-kqp-host PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_explain_prepared.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_physical.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_prepared.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_scan.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_runner.cpp ) diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 8d80ff05c0a..85b0f0d8804 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -654,32 +654,21 @@ public: if (!AsyncResult) { auto& query = QueryCtx->PreparedQuery; YQL_ENSURE(QueryCtx->Type != EKikimrQueryType::Unspecified); + YQL_ENSURE(query->GetVersion() == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1); - if (query->GetVersion() == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1) { - if (CurrentKqlIndex) { - return TStatus::Ok; - } + if (CurrentKqlIndex) { + return TStatus::Ok; + } - std::shared_ptr<const NKqpProto::TKqpPhyQuery> phyQuery(query, &query->GetPhysicalQuery()); + std::shared_ptr<const NKqpProto::TKqpPhyQuery> phyQuery(query, &query->GetPhysicalQuery()); - if (QueryCtx->Type == EKikimrQueryType::Scan) { - AsyncResult = KqpRunner->ExecutePreparedScanQuery(Cluster, input.Get(), std::move(phyQuery), - ctx, ExecuteCtx->ReplyTarget); - } else { - YQL_ENSURE(QueryCtx->Type == EKikimrQueryType::Dml); - AsyncResult = KqpRunner->ExecutePreparedQueryNewEngine(Cluster, input.Get(), std::move(phyQuery), - ctx, ExecuteCtx->Settings); - } + if (QueryCtx->Type == EKikimrQueryType::Scan) { + AsyncResult = KqpRunner->ExecutePreparedScanQuery(Cluster, input.Get(), std::move(phyQuery), + ctx, ExecuteCtx->ReplyTarget); } else { - if (CurrentKqlIndex >= query->KqlsSize()) { - return TStatus::Ok; - } - - const auto& kql = query->GetKqls(CurrentKqlIndex); - YQL_ENSURE(QueryCtx->Type == EKikimrQueryType::Dml); - YQL_ENSURE(!kql.GetSettings().GetNewEngine()); - AsyncResult = KqpRunner->ExecutePreparedDataQuery(Cluster, input.Get(), kql, ctx, ExecuteCtx->Settings); + AsyncResult = KqpRunner->ExecutePreparedQueryNewEngine(Cluster, input.Get(), std::move(phyQuery), + ctx, ExecuteCtx->Settings); } } diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index d000655c9da..d3d8966b1c6 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -320,10 +320,6 @@ public: const NYql::TExprNode::TPtr& query, NYql::TExprContext& ctx, const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0; - virtual TIntrusivePtr<TAsyncQueryResult> ExecutePreparedDataQuery(const TString& cluster, - NYql::TExprNode* queryExpr, const NKikimrKqp::TPreparedKql& kql, NYql::TExprContext& ctx, - const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0; - virtual TIntrusivePtr<TAsyncQueryResult> ExecutePreparedQueryNewEngine(const TString& cluster, const NYql::TExprNode::TPtr& world, std::shared_ptr<const NKqpProto::TKqpPhyQuery>&& phyQuery, NYql::TExprContext& ctx, const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0; @@ -337,12 +333,8 @@ TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, co TIntrusivePtr<NYql::TTypeAnnotationContext> typesCtx, TIntrusivePtr<NYql::TKikimrSessionContext> sessionCtx, const NMiniKQL::IFunctionRegistry& funcRegistry); -TAutoPtr<NYql::IGraphTransformer> CreateKqpExecutePreparedTransformer(TIntrusivePtr<IKqpGateway> gateway, - const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx); - TAutoPtr<NYql::IGraphTransformer> CreateKqpAcquireMvccSnapshotTransformer(TIntrusivePtr<IKqpGateway> gateway, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool newEngine = false); + TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx); TAutoPtr<NYql::IGraphTransformer> CreateKqpExecutePhysicalDataTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState, diff --git a/ydb/core/kqp/host/kqp_run_prepared.cpp b/ydb/core/kqp/host/kqp_run_prepared.cpp deleted file mode 100644 index b200da0f961..00000000000 --- a/ydb/core/kqp/host/kqp_run_prepared.cpp +++ /dev/null @@ -1,213 +0,0 @@ -#include "kqp_host_impl.h" - -#include <ydb/library/yql/utils/log/log.h> - -#include <google/protobuf/text_format.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; -using namespace NThreading; - -namespace { - -class TKqpExecutePreparedTransformer : public TGraphTransformerBase { -public: - TKqpExecutePreparedTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) - : Gateway(gateway) - , Cluster(cluster) - , TxState(txState) - , TransformCtx(transformCtx) - , CurrentMkqlIndex(0) - {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - const auto& kql = TransformCtx->GetPreparedKql(); - - // TODO: Enable after switch to NewEngine - // YQL_ENSURE(!TransformCtx->Config->HasKqpForceNewEngine()); - - if (CurrentMkqlIndex >= kql.MkqlsSize()) { - return Finish(ctx); - } - - const auto& mkql = kql.GetMkqls(CurrentMkqlIndex); - - AcquireLocks = ShouldAcquireLocks(kql); - - Promise = NewPromise(); - - if (!Execute(mkql, MkqlExecuteResult.Future)) { - return TStatus::Error; - } - - auto promise = Promise; - MkqlExecuteResult.Future.Apply([promise](const TFuture<IKqpGateway::TMkqlResult> future) mutable { - YQL_ENSURE(future.HasValue()); - promise.SetValue(); - }); - - return TStatus::Async; - } - - TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { - Y_UNUSED(input); - return Promise.GetFuture(); - } - - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - NKikimr::NKqp::IKqpGateway::TMkqlResult result(MkqlExecuteResult.Future.ExtractValue()); - result.ReportIssues(ctx.IssueManager); - if (!result.Success()) { - return TStatus::Error; - } - - const auto& mkql = TransformCtx->GetPreparedKql().GetMkqls(CurrentMkqlIndex); - if (mkql.GetIsPure()) { - Y_VERIFY_DEBUG(result.TxStats.TableAccessStatsSize() == 0); - } - - auto mkqlResult = MakeSimpleShared<NKikimrMiniKQL::TResult>(); - mkqlResult->Swap(&result.Result); - - if (AcquireLocks) { - if (!UnpackMergeLocks(*mkqlResult, TxState->Tx(), ctx)) { - return TStatus::Error; - } - } - - TransformCtx->MkqlResults.push_back(mkqlResult); - TransformCtx->AddMkqlStats(MkqlExecuteResult.Program, std::move(result.TxStats)); - - ++CurrentMkqlIndex; - return TStatus::Repeat; - } - - void Rewind() override { - CurrentMkqlIndex = 0; - } - -private: - TStatus Finish(TExprContext& ctx) { - const auto& kql = TransformCtx->GetPreparedKql(); - if (!kql.GetEffects().empty()) { - YQL_ENSURE(kql.GetEffects().size() == 1); - - auto& effect = kql.GetEffects(0); - - TExprNode::TPtr expr; - if (!GetExpr(effect.GetNodeAst(), expr, ctx)) { - return TStatus::Error; - } - - TVector<NKikimrKqp::TParameterBinding> bindings(effect.GetBindings().begin(), - effect.GetBindings().end()); - - bool preserveParams = !TransformCtx->Settings.GetCommitTx(); - if (!AddDeferredEffect(TExprBase(expr), bindings, ctx, *TxState, *TransformCtx, preserveParams)) { - return TStatus::Error; - } - } - return TStatus::Ok; - } - - bool Execute(const NKikimrKqp::TPreparedMkql& mkql, TFuture<IKqpGateway::TMkqlResult>& future) { - if (YQL_CLOG_ACTIVE(DEBUG, ProviderKqp)) { - TString mkqlText; - NProtoBuf::TextFormat::PrintToString(mkql, &mkqlText); - YQL_CLOG(DEBUG, ProviderKqp) << "Mkql:" << Endl << mkqlText; - } - - TVector<NKikimrKqp::TParameterBinding> bindings(mkql.GetBindings().begin(), mkql.GetBindings().end()); - auto execParams = BuildParamsMap(bindings, TxState, TransformCtx, AcquireLocks); - - if (YQL_CLOG_ACTIVE(TRACE, ProviderKqp)) { - TStringBuilder paramsTextBuilder; - for (auto& pair : execParams.Values) { - TString paramText; - NProtoBuf::TextFormat::PrintToString(pair.second.GetValue(), ¶mText); - paramsTextBuilder << pair.first << ": " << paramText << Endl; - } - - YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL parameters:" << Endl << paramsTextBuilder; - } - - if (TransformCtx->QueryCtx->StatsMode == EKikimrStatsMode::Full) { - MkqlExecuteResult.Program = mkql.GetProgramText(); - } - - future = Gateway->ExecuteMkqlPrepared(Cluster, mkql.GetProgram(), std::move(execParams), - TransformCtx->GetMkqlSettings(false, Gateway->GetCurrentTime()), - TxState->Tx().GetSnapshot()); - return true; - } - - bool GetExpr(const TString& astStr, TExprNode::TPtr& expr, TExprContext& ctx) { - auto astRes = ParseAst(astStr); - ctx.IssueManager.AddIssues(astRes.Issues); - if (!astRes.IsOk()) { - return false; - } - - return CompileExpr(*astRes.Root, expr, ctx, nullptr); - } - - bool ShouldAcquireLocks(const NKikimrKqp::TPreparedKql& kql) { - if (*TxState->Tx().EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { - return false; - } - - if (TxState->Tx().Locks.Broken()) { - return false; // Do not acquire locks after first lock issue - } - - if (!TxState->Tx().DeferredEffects.Empty() || !kql.GetEffects().empty()) { - return true; // Acquire locks in read write tx - } - - if (!TransformCtx->Settings.GetCommitTx()) { - return true; // It is not a commit tx - } - - if (TxState->Tx().GetSnapshot().IsValid()) { - return false; // Read only tx with snapshot, no need to acquire locks - } - - if (CurrentMkqlIndex == kql.MkqlsSize() - 1 && !TxState->Tx().Locks.HasLocks()) { - return false; // Final phase of read only tx, no need to acquire locks - } - - return true; - } - -private: - TIntrusivePtr<IKqpGateway> Gateway; - TString Cluster; - - TIntrusivePtr<TKqpTransactionState> TxState; - TIntrusivePtr<TKqlTransformContext> TransformCtx; - - ui32 CurrentMkqlIndex; - bool AcquireLocks; - TMkqlExecuteResult MkqlExecuteResult; - TPromise<void> Promise; -}; - -} // namespace - -TAutoPtr<IGraphTransformer> CreateKqpExecutePreparedTransformer(TIntrusivePtr<IKqpGateway> gateway, - const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx) -{ - return new TKqpExecutePreparedTransformer(gateway, cluster, txState, transformCtx); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index c3a2196a66b..0eb7a61a13d 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -145,41 +145,9 @@ public: sessionCtx->TablesPtr())) , BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>()) { - KqlTypeAnnTransformer = CreateTypeAnnotationTransformer(CreateExtCallableTypeAnnotationTransformer(*typesCtx), - *typesCtx); - auto logLevel = NYql::NLog::ELevel::TRACE; auto logComp = NYql::NLog::EComponent::ProviderKqp; - KqlOptimizeTransformer = TTransformationPipeline(typesCtx) - .AddServiceTransformers() - .Add(TLogExprTransformer::Sync("KqlOptimizeTransformer", logComp, logLevel), "LogKqlOptimize") - .AddTypeAnnotationTransformer() - .AddPostTypeAnnotation() - .AddOptimization(false) - .Build(false); - - KqlPrepareTransformer = TTransformationPipeline(typesCtx) - .AddServiceTransformers() - .Add(TLogExprTransformer::Sync("KqlPrepareTransformer", logComp, logLevel), "LogKqlRun") - .Add(new TKqpIterationGuardTransformer(), "IterationGuard") - .AddTypeAnnotationTransformer() - .Add(CreateKqpCheckKiProgramTransformer(), "CheckQuery") - .Add(CreateKqpSimplifyTransformer(), "Simplify") - .Add(CreateKqpAnalyzeTransformer(TransformCtx), "Analyze") - .Add(CreateKqpRewriteTransformer(TransformCtx), "Rewrite") - .Add(CreateKqpExecTransformer(Gateway, Cluster, TxState, TransformCtx), "Prepare") - .Add(CreateKqpSubstituteTransformer(TxState, TransformCtx), "Substitute") - .Add(CreateKqpFinalizeTransformer(Gateway, Cluster, TxState, TransformCtx), "Finalize") - .Build(false); - - PreparedRunTransformer = TTransformationPipeline(typesCtx) - .Add(TLogExprTransformer::Sync("PreparedRun iteration", logComp, logLevel), "KqlPreparedRun") - .Add(CreateKqpAcquireMvccSnapshotTransformer(Gateway, TxState, TransformCtx), "AcquireMvccSnapshot") - .Add(CreateKqpExecutePreparedTransformer(Gateway, Cluster, TxState, TransformCtx), "ExecutePrepared") - .Add(CreateKqpFinalizeTransformer(Gateway, Cluster, TxState, TransformCtx), "Finalize") - .Build(false); - PreparedExplainTransformer = TTransformationPipeline(typesCtx) .Add(CreateKqpExplainPreparedTransformer(Gateway, Cluster, TransformCtx), "ExplainQuery") .Build(false); @@ -230,7 +198,7 @@ public: .Build(false); PhysicalRunQueryTransformer = TTransformationPipeline(typesCtx) - .Add(CreateKqpAcquireMvccSnapshotTransformer(Gateway, TxState, TransformCtx, true), "AcquireMvccSnapshot") + .Add(CreateKqpAcquireMvccSnapshotTransformer(Gateway, TxState, TransformCtx), "AcquireMvccSnapshot") .Add(CreateKqpExecutePhysicalDataTransformer(Gateway, Cluster, TxState, TransformCtx), "ExecutePhysical") .Build(false); @@ -277,34 +245,6 @@ public: return PrepareQueryInternal(cluster, dataQuery, ctx, scanSettings); } - TIntrusivePtr<TAsyncQueryResult> ExecutePreparedDataQuery(const TString& cluster, TExprNode* queryExpr, - const NKikimrKqp::TPreparedKql& kql, TExprContext& ctx, - const IKikimrQueryExecutor::TExecuteSettings& settings) override - { - YQL_ENSURE(false, "Unexpected query execute in OldEngine mode."); - - YQL_ENSURE(queryExpr->Type() == TExprNode::World); - YQL_ENSURE(cluster == Cluster); - - PreparedRunTransformer->Rewind(); - - TransformCtx->Reset(); - TransformCtx->Settings.CopyFrom(kql.GetSettings()); - TransformCtx->Settings.SetCommitTx(settings.CommitTx); - TransformCtx->Settings.SetRollbackTx(settings.RollbackTx); - TransformCtx->PreparedKql = &kql; - - YQL_ENSURE(TxState->Tx().EffectiveIsolationLevel); - YQL_ENSURE(TransformCtx->Settings.GetIsolationLevel() == NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); - - bool strictDml = MergeFlagValue(Config->StrictDml.Get(Cluster), settings.StrictDml); - if (!ApplyTableOperations(kql, strictDml, ctx)) { - return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); - } - - return MakeIntrusive<TAsyncRunResult>(queryExpr, ctx, *PreparedRunTransformer, *TransformCtx); - } - TIntrusivePtr<TAsyncQueryResult> ExecutePreparedQueryNewEngine(const TString& cluster, const NYql::TExprNode::TPtr& world, std::shared_ptr<const NKqpProto::TKqpPhyQuery>&& phyQuery, TExprContext& ctx, const IKikimrQueryExecutor::TExecuteSettings& settings) override @@ -516,10 +456,6 @@ private: TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx; TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx; - TAutoPtr<IGraphTransformer> KqlTypeAnnTransformer; - TAutoPtr<IGraphTransformer> KqlOptimizeTransformer; - TAutoPtr<IGraphTransformer> KqlPrepareTransformer; - TAutoPtr<IGraphTransformer> PreparedRunTransformer; TAutoPtr<IGraphTransformer> PreparedExplainTransformer; TAutoPtr<IGraphTransformer> PhysicalOptimizeTransformer; @@ -532,19 +468,17 @@ private: class TKqpAcquireMvccSnapshotTransformer : public TGraphTransformerBase { public: TKqpAcquireMvccSnapshotTransformer(TIntrusivePtr<IKqpGateway> gateway, TIntrusivePtr<TKqlTransformContext> transformCtx, - TIntrusivePtr<TKqpTransactionState> txState, bool newEngine) + TIntrusivePtr<TKqpTransactionState> txState) : Gateway(std::move(gateway)) , TransformCtx(std::move(transformCtx)) - , NewEngine(newEngine) - , TxState(std::move(txState)) - {} + , TxState(std::move(txState)) {} TStatus DoTransform(NYql::TExprNode::TPtr input, NYql::TExprNode::TPtr& output, NYql::TExprContext&) override { output = input; if (!NeedSnapshot(TxState->Tx(), *TransformCtx->Config, TransformCtx->Settings.GetRollbackTx(), - TransformCtx->Settings.GetCommitTx(), NewEngine ? TransformCtx->PhysicalQuery.get() : nullptr, - NewEngine ? nullptr : TransformCtx->PreparedKql)) { + TransformCtx->Settings.GetCommitTx(),TransformCtx->PhysicalQuery.get(), nullptr)) + { return TStatus::Ok; } @@ -605,13 +539,9 @@ public: } private: - -private: TIntrusivePtr<IKqpGateway> Gateway; TIntrusivePtr<TKqlTransformContext> TransformCtx; - bool NewEngine; - NThreading::TFuture<IKqpGateway::TKqpSnapshotHandle> SnapshotFuture; NThreading::TPromise<void> Promise; TIntrusivePtr<TKqpTransactionState> TxState; @@ -620,8 +550,8 @@ private: } // namespace TAutoPtr<NYql::IGraphTransformer> CreateKqpAcquireMvccSnapshotTransformer(TIntrusivePtr<IKqpGateway> gateway, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool newEngine) { - return new TKqpAcquireMvccSnapshotTransformer(gateway, transformCtx, txState, newEngine); + TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) { + return new TKqpAcquireMvccSnapshotTransformer(gateway, transformCtx, txState); } TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index bb0d0e71a81..407a7eec910 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -95,7 +95,7 @@ public: std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TlsActivationContext->ActorSystem(), true); Gateway = CreateKikimrIcGateway(Query.Cluster, Query.Database, std::move(loader), - ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, MakeMiniKQLCompileServiceID()); + ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters); Gateway->SetToken(Query.Cluster, UserToken); Config->FeatureFlags = AppData(ctx)->FeatureFlags; diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index 33d3d2ceb0c..7a407992e0e 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -6,9 +6,7 @@ #include <ydb/core/base/path.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/kikimr_issue.h> -#include <ydb/core/client/minikql_compile/mkql_compile_service.h> #include <ydb/core/cms/console/configs_dispatcher.h> -#include <ydb/core/engine/mkql_engine_flat.h> #include <ydb/core/engine/mkql_proto.h> #include <ydb/core/kqp/common/kqp_gateway.h> #include <ydb/core/kqp/executer/kqp_executer.h> @@ -25,7 +23,6 @@ #include <ydb/library/aclib/aclib.h> #include <ydb/public/lib/base/msgbus_status.h> -#include <ydb/library/yql/minikql/mkql_node_serialization.h> #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -45,8 +42,6 @@ using namespace NYql::NCommon; using namespace NSchemeShard; using namespace NKikimrSchemeOp; -using NKikimrTxUserProxy::TMiniKQLTransaction; - constexpr const IKqpGateway::TKqpSnapshot IKqpGateway::TKqpSnapshot::InvalidSnapshot = TKqpSnapshot(); #define STATIC_ASSERT_STATE_EQUAL(name) \ @@ -480,184 +475,6 @@ private: TVector<NYql::NDqProto::TDqExecutionStats> Executions; }; -class TMkqlRequestHandler : public TRequestHandlerBase< - TMkqlRequestHandler, - TEvTxUserProxy::TEvProposeTransaction, - TEvTxUserProxy::TEvProposeTransactionStatus, - IKqpGateway::TMkqlResult> -{ -public: - using TBase = typename TMkqlRequestHandler::TBase; - using TCallbackFunc = typename TBase::TCallbackFunc; - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus; - using TResult = IKqpGateway::TMkqlResult; - - TMkqlRequestHandler(const TAlignedPagePoolCounters& allocCounters, TRequest* request, - TKqpParamsMap&& paramsMap, TPromise<TResult> promise, TCallbackFunc callback, - const TActorId& miniKqlComplileServiceActorId) - : TBase(request, promise, callback) - , ParamsMap(std::move(paramsMap)) - , CompilationPending(false) - , CompilationRetried(false) - , AllocCounters(allocCounters) - , MiniKqlComplileServiceActorId(miniKqlComplileServiceActorId) - {} - - void Bootstrap(const TActorContext& ctx) { - auto& mkqlTx = *Request->Record.MutableTransaction()->MutableMiniKQLTransaction(); - - if (mkqlTx.HasProgram() && mkqlTx.GetProgram().HasText()) { - MkqlProgramText = mkqlTx.GetProgram().GetText(); - CompileProgram(ctx, mkqlTx.GetMode() == NKikimrTxUserProxy::TMiniKQLTransaction::COMPILE); - } - - mkqlTx.SetCollectStats(true); - YQL_ENSURE(!mkqlTx.HasParams()); - - if (!ParamsMap.Values.empty()) { - try { - mkqlTx.MutableParams()->SetBin(BuildParams(ParamsMap, AllocCounters, ctx)); - } - catch(const yexception& e) { - Promise.SetValue(ResultFromError<TResult>(e.what())); - this->Die(ctx); - return; - } - } - - ProceedWithExecution(ctx); - - this->Become(&TMkqlRequestHandler::AwaitState); - } - - void Handle(TMiniKQLCompileServiceEvents::TEvCompileStatus::TPtr &ev, const TActorContext &ctx) { - const auto& result = ev->Get()->Result; - auto& mkqlTx = *Request->Record.MutableTransaction()->MutableMiniKQLTransaction(); - - if (!result.Errors.Empty()) { - Promise.SetValue(ResultFromIssues<TResult>(GetMkqlCompileStatus(result.Errors), - "Query compilation failed", result.Errors)); - - this->Die(ctx); - return; - } - - if (mkqlTx.GetMode() == NKikimrTxUserProxy::TMiniKQLTransaction::COMPILE) { - TResult reply; - reply.SetSuccess(); - reply.CompiledProgram = result.CompiledProgram; - Promise.SetValue(std::move(reply)); - this->Die(ctx); - return; - } - - auto& pgm = *mkqlTx.MutableProgram(); - pgm.ClearText(); - pgm.SetBin(result.CompiledProgram); - - CompileResolveCookies = std::move(ev->Get()->CompileResolveCookies); - - CompilationPending = false; - ProceedWithExecution(ctx); - } - - void HandleResponse(TResponse::TPtr &ev, const TActorContext &ctx) { - auto& response = *ev->Get(); - ui32 status = response.Record.GetStatus(); - - bool resolveError = status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ResolveError || - status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable; - - if (resolveError && !CompilationRetried && !MkqlProgramText.empty()) { - CompilationRetried = true; - CompileProgram(ctx, false); - ProceedWithExecution(ctx); - return; - } - - Callback(Promise, std::move(*ev->Get())); - this->Die(ctx); - } - - using TBase::Handle; - - STFUNC(AwaitState) { - switch (ev->GetTypeRewrite()) { - HFunc(TResponse, HandleResponse); - HFunc(TEvTabletPipe::TEvClientConnected, Handle); - HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - HFunc(TMiniKQLCompileServiceEvents::TEvCompileStatus, Handle); - default: - TBase::HandleUnexpectedEvent("TMkqlRequestHandler", ev->GetTypeRewrite(), ctx); - } - } - -private: - static TString BuildParams(const TKqpParamsMap& paramsMap, const TAlignedPagePoolCounters& counters, - const TActorContext &ctx) - { - NMiniKQL::TScopedAlloc alloc(__LOCATION__, counters, AppData(ctx)->FunctionRegistry->SupportsSizedAllocators()); - NMiniKQL::TTypeEnvironment env(alloc); - - NMiniKQL::TStructLiteralBuilder structBuilder(env); - structBuilder.Reserve(paramsMap.Values.size()); - for (auto& pair : paramsMap.Values) { - const TString& name = pair.first; - const NYql::NDq::TMkqlValueRef& param = pair.second; - - auto valueNode = NMiniKQL::ImportValueFromProto(param.GetType(), param.GetValue(), env); - structBuilder.Add(name, valueNode); - } - - auto node = NMiniKQL::TRuntimeNode(structBuilder.Build(), true); - return NMiniKQL::SerializeRuntimeNode(node, env); - } - - NYql::EYqlIssueCode GetMkqlCompileStatus(const NYql::TIssues& issues) { - for (auto& issue : issues) { - switch (issue.GetCode()) { - case NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR: - return TIssuesIds::KIKIMR_SCHEME_MISMATCH; - - case NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR: - return TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE; - } - } - - return TIssuesIds::DEFAULT_ERROR; - } - - void CompileProgram(const TActorContext &ctx, bool forceRefresh) { - auto compileEv = MakeHolder<TMiniKQLCompileServiceEvents::TEvCompile>(MkqlProgramText); - compileEv->ForceRefresh = forceRefresh; - if (!CompileResolveCookies.empty()) { - compileEv->CompileResolveCookies = std::move(CompileResolveCookies); - } - ctx.Send(MiniKqlComplileServiceActorId, compileEv.Release()); - CompilationPending = true; - } - - void ProceedWithExecution(const TActorContext& ctx) { - if (!CompilationPending) { - TAutoPtr<TRequest> ev = new TRequest(); - ev->Record.CopyFrom(this->Request->Record); - - TActorId txproxy = MakeTxProxyID(); - ctx.Send(txproxy, ev.Release()); - } - } - -private: - TKqpParamsMap ParamsMap; - bool CompilationPending; - bool CompilationRetried; - TString MkqlProgramText; - THashMap<TString, ui64> CompileResolveCookies; - TAlignedPagePoolCounters AllocCounters; - TActorId MiniKqlComplileServiceActorId; -}; - class TSchemeOpRequestHandler: public TRequestHandlerBase< TSchemeOpRequestHandler, TEvTxUserProxy::TEvProposeTransaction, @@ -994,15 +811,13 @@ private: public: TKikimrIcGateway(const TString& cluster, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader, - TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const TActorId& mkqlComplileService) + TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters) : Cluster(cluster) , Database(database) , ActorSystem(actorSystem) , NodeId(nodeId) , Counters(counters) - , MetadataLoader(std::move(metadataLoader)) - , MkqlComplileService(mkqlComplileService) - {} + , MetadataLoader(std::move(metadataLoader)) {} bool HasCluster(const TString& cluster) override { return cluster == Cluster; @@ -1799,26 +1614,6 @@ public: } } - TFuture<TMkqlResult> ExecuteMkql(const TString& cluster, const TString& program, - TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) override - { - YQL_ENSURE(false, "Unexpected exec mkql in OldEngine mode."); - - return RunInternal(cluster, program, std::move(params), false, false, settings, snapshot); - } - - TFuture<TMkqlResult> ExecuteMkqlPrepared(const TString& cluster, const TString& program, - TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) override - { - YQL_ENSURE(false, "Unexpected exeс prepared mkql in OldEngine mode."); - return RunInternal(cluster, program, std::move(params), false, true, settings, snapshot); - } - - TFuture<TMkqlResult> PrepareMkql(const TString& cluster, const TString& program) override { - YQL_ENSURE(false, "Unexpected prepare mkql in OldEngine mode."); - return RunInternal(cluster, program, TKqpParamsMap(), true, false, TMkqlSettings()); - } - TFuture<TExecPhysicalResult> ExecutePhysical(TExecPhysicalRequest&& request, const NActors::TActorId& target) override { return ExecutePhysicalQueryInternal(std::move(request), target, false); } @@ -2202,17 +1997,6 @@ private: return promise.GetFuture(); } - TFuture<TMkqlResult> SendMkqlRequest(TEvTxUserProxy::TEvProposeTransaction* request, - TKqpParamsMap&& paramsMap, TMkqlRequestHandler::TCallbackFunc callback) - { - auto promise = NewPromise<TMkqlResult>(); - IActor* requestHandler = new TMkqlRequestHandler(Counters->Counters->AllocCounters, request, - std::move(paramsMap), promise, callback, MkqlComplileService); - RegisterActor(requestHandler); - - return promise.GetFuture(); - } - TFuture<TGenericResult> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request) { auto promise = NewPromise<TGenericResult>(); @@ -2250,91 +2034,6 @@ private: return cluster == Cluster; } - TFuture<TMkqlResult> RunInternal(const TString& cluster, const TString& program, TKqpParamsMap&& params, - bool compileOnly, bool prepared, const TMkqlSettings& settings, const TKqpSnapshot& snapshot = TKqpSnapshot::InvalidSnapshot) - { - using TRequest = TEvTxUserProxy::TEvProposeTransaction; - - try { - if (!CheckCluster(cluster)) { - return InvalidCluster<TMkqlResult>(cluster); - } - - auto ev = MakeHolder<TRequest>(); - ev->Record.SetDatabaseName(Database); - if (UserToken) { - ev->Record.SetUserToken(UserToken->Serialized); - } - auto& mkqlTx = *ev->Record.MutableTransaction()->MutableMiniKQLTransaction(); - mkqlTx.SetFlatMKQL(true); - mkqlTx.SetMode(compileOnly ? TMiniKQLTransaction::COMPILE : TMiniKQLTransaction::COMPILE_AND_EXEC); - - if (prepared) { - mkqlTx.MutableProgram()->SetBin(program); - } else { - mkqlTx.MutableProgram()->SetText(program); - } - - if (!compileOnly) { - if (settings.LlvmRuntime) { - mkqlTx.SetLlvmRuntime(true); - } - - if (settings.PerShardKeysSizeLimitBytes) { - mkqlTx.SetPerShardKeysSizeLimitBytes(*settings.PerShardKeysSizeLimitBytes); - } - - if (settings.CancelAfterMs) { - ev->Record.SetCancelAfterMs(settings.CancelAfterMs); - } - - if (settings.TimeoutMs) { - ev->Record.SetExecTimeoutPeriod(settings.TimeoutMs); - } - - const auto& limits = settings.Limits; - - if (limits.AffectedShardsLimit) { - mkqlTx.MutableLimits()->SetAffectedShardsLimit(limits.AffectedShardsLimit); - } - - if (limits.ReadsetCountLimit) { - mkqlTx.MutableLimits()->SetReadsetCountLimit(limits.ReadsetCountLimit); - } - - if (limits.ComputeNodeMemoryLimitBytes) { - mkqlTx.MutableLimits()->SetComputeNodeMemoryLimitBytes(limits.ComputeNodeMemoryLimitBytes); - } - - if (limits.TotalReadSizeLimitBytes) { - mkqlTx.MutableLimits()->SetTotalReadSizeLimitBytes(limits.TotalReadSizeLimitBytes); - } - - if (snapshot.IsValid()) { - mkqlTx.SetSnapshotStep(snapshot.Step); - mkqlTx.SetSnapshotTxId(snapshot.TxId); - } - } - - if (settings.CollectStats) { - mkqlTx.SetCollectStats(true); - } - - return SendMkqlRequest(ev.Release(), std::move(params), - [compileOnly] (TPromise<TMkqlResult> promise, TTransactionResponse&& response) { - try { - promise.SetValue(GetMkqlResult(GetRunResponse(std::move(response.Record)), compileOnly)); - } - catch (yexception& e) { - promise.SetValue(ResultFromException<TMkqlResult>(e)); - } - }); - } - catch (yexception& e) { - return MakeFuture(ResultFromException<TMkqlResult>(e)); - } - } - TFuture<TExecPhysicalResult> ExecutePhysicalQueryInternal(TExecPhysicalRequest&& request, const TActorId& target, bool streaming) { @@ -2378,40 +2077,6 @@ private: } private: - static TRunResponse GetRunResponse(NKikimrTxUserProxy::TEvProposeTransactionStatus&& ev) { - IKqpGateway::TRunResponse response; - - response.HasProxyError = ev.GetStatus() != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete - && ev.GetStatus() != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecAlready; - response.ProxyStatus = ev.GetStatus(); - if (response.HasProxyError) { - NMsgBusProxy::ExplainProposeTransactionStatus(ev.GetStatus(), response.ProxyStatusName, - response.ProxyStatusDesc); - } - - auto executionResponseStatus = static_cast<NMiniKQL::IEngineFlat::EStatus>( - ev.GetExecutionEngineResponseStatus()); - auto executionEngineStatus = static_cast<NMiniKQL::IEngineFlat::EResult>( - ev.GetExecutionEngineStatus()); - response.HasExecutionEngineError = executionResponseStatus == NMiniKQL::IEngineFlat::EStatus::Error && - executionEngineStatus != NMiniKQL::IEngineFlat::EResult::Ok; - if (response.HasExecutionEngineError) { - NMsgBusProxy::ExplainExecutionEngineStatus(ev.GetExecutionEngineStatus(), - response.ExecutionEngineStatusName, response.ExecutionEngineStatusDesc); - } - - NYql::IssuesFromMessage(ev.GetIssues(), response.Issues); - - response.MiniKQLErrors = ev.GetMiniKQLErrors(); - response.DataShardErrors = ev.GetDataShardErrors(); - response.MiniKQLCompileResults = ev.GetMiniKQLCompileResults(); - - response.ExecutionEngineEvaluatedResponse.Swap(ev.MutableExecutionEngineEvaluatedResponse()); - response.TxStats = ev.GetTxStats(); - - return response; - } - static TListPathResult GetListPathResult(const TPathDescription& pathDesc, const TString& path) { if (pathDesc.GetSelf().GetPathType() != EPathTypeDir) { return ResultFromError<TListPathResult>(TString("Directory not found: ") + path); @@ -2496,66 +2161,6 @@ private: } } - static TMkqlResult GetMkqlResult(TRunResponse&& response, bool compileOnly) { - auto& txRes = response.MiniKQLCompileResults; - - if (txRes.ProgramCompileErrorsSize() > 0) { - NYql::TIssues errors; - for (size_t i = 0, end = txRes.ProgramCompileErrorsSize(); i < end; ++i) { - const auto& err = txRes.GetProgramCompileErrors(i); - errors.AddIssue(NYql::IssueFromMessage(err)); - } - return ResultFromIssues<TMkqlResult>(TIssuesIds::KIKIMR_COMPILE_ERROR, "MiniKQL compilation error", - errors); - } - - YQL_ENSURE(txRes.ParamsCompileErrorsSize() == 0); - - if (!compileOnly) { - NYql::TIssues internalIssues; - if (response.HasExecutionEngineError) { - internalIssues.AddIssue(TIssue(NYql::TPosition(), TString("Execution engine failure (") - + response.ExecutionEngineStatusName + "): " + response.ExecutionEngineStatusDesc)); - } - - if (!response.DataShardErrors.empty()) { - internalIssues.AddIssue(TIssue(NYql::TPosition(), TString("Data shard errors: ") - + response.DataShardErrors)); - } - - if (!response.MiniKQLErrors.empty()) { - internalIssues.AddIssue(TIssue(NYql::TPosition(), TString("Execution engine errors: ") - + response.MiniKQLErrors)); - } - - if (response.HasProxyError) { - auto message = TString("Error executing transaction (") + response.ProxyStatusName + "): " - + response.ProxyStatusDesc; - - NYql::TIssue proxyIssue(NYql::TPosition(), message); - for (auto& issue : internalIssues) { - proxyIssue.AddSubIssue(MakeIntrusive<TIssue>(issue)); - } - for (auto& issue : response.Issues) { - proxyIssue.AddSubIssue(MakeIntrusive<TIssue>(issue)); - } - - return ResultFromIssues<TMkqlResult>(KikimrProxyErrorStatus(response.ProxyStatus), {proxyIssue}); - } - - if (!internalIssues.Empty()) { - return ResultFromErrors<TMkqlResult>(internalIssues); - } - } - - TMkqlResult result; - result.SetSuccess(); - result.CompiledProgram = txRes.GetCompiledProgram(); - result.Result.Swap(&response.ExecutionEngineEvaluatedResponse); - result.TxStats.Swap(&response.TxStats); - return result; - } - static NYql::EYqlIssueCode KikimrProxyErrorStatus(ui32 proxyStatus) { NYql::EYqlIssueCode status = TIssuesIds::DEFAULT_ERROR; @@ -2919,16 +2524,16 @@ private: TAlignedPagePoolCounters AllocCounters; TMaybe<TUserTokenData> UserToken; std::shared_ptr<IKqpTableMetadataLoader> MetadataLoader; - TActorId MkqlComplileService; }; } // namespace TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database, - std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, - const TActorId& mkqlComplileService) + std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem, + ui32 nodeId, TKqpRequestCounters::TPtr counters) { - return MakeIntrusive<TKikimrIcGateway>(cluster, database, std::move(metadataLoader), actorSystem, nodeId, counters, mkqlComplileService); + return MakeIntrusive<TKikimrIcGateway>(cluster, database, std::move(metadataLoader), actorSystem, nodeId, + counters); } } // namespace NKqp diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h index 0f4a047fca6..368d9ca2209 100644 --- a/ydb/core/kqp/kqp_impl.h +++ b/ydb/core/kqp/kqp_impl.h @@ -84,8 +84,8 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters); TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database, - std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, - const TActorId& MkqlCompileService); + std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, + ui32 nodeId, TKqpRequestCounters::TPtr counters); TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const NYql::TIssue& issue); Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult); diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 3b2a3dd8e3d..3fa34288736 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -139,7 +139,7 @@ public: std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TlsActivationContext->ActorSystem(), false); Gateway = CreateKikimrIcGateway(Settings.Cluster, Settings.Database, std::move(loader), - ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, MakeMiniKQLCompileServiceID()); + ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters); Config->FeatureFlags = AppData(ctx)->FeatureFlags; diff --git a/ydb/core/kqp/prepare/CMakeLists.txt b/ydb/core/kqp/prepare/CMakeLists.txt index ae03f2ae962..a9dfdeee9f6 100644 --- a/ydb/core/kqp/prepare/CMakeLists.txt +++ b/ydb/core/kqp/prepare/CMakeLists.txt @@ -22,17 +22,12 @@ target_link_libraries(core-kqp-prepare PUBLIC tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-prepare PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_analyze.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_exec.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_finalize.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_plan.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_rewrite.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_simplify.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_substitute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_type_ann.cpp ) generate_enum_serilization(core-kqp-prepare - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_prepare_impl.h + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_plan.h INCLUDE_HEADERS - ydb/core/kqp/prepare/kqp_prepare_impl.h + ydb/core/kqp/prepare/kqp_query_plan.h ) diff --git a/ydb/core/kqp/prepare/kqp_prepare.h b/ydb/core/kqp/prepare/kqp_prepare.h index 27c956ea148..a03b40d1a6e 100644 --- a/ydb/core/kqp/prepare/kqp_prepare.h +++ b/ydb/core/kqp/prepare/kqp_prepare.h @@ -6,74 +6,6 @@ namespace NKikimr { namespace NKqp { -enum class ECommitSafety { - Full, - Safe, - Moderate -}; - -struct TExprScope { - NYql::NNodes::TCallable Callable; - NYql::NNodes::TCoLambda Lambda; - ui32 Depth; - - TExprScope(NYql::NNodes::TCallable callable, NYql::NNodes::TCoLambda Lambda, ui32 depth) - : Callable(callable) - , Lambda(Lambda) - , Depth(depth) {} -}; - -struct TNodeInfo { - TMaybe<TExprScope> Scope; - bool IsImmediate; - bool RequireImmediate; - bool IsExecutable; - bool AreInputsExecutable; - - TNodeInfo() - : IsImmediate(false) - , RequireImmediate(false) - , IsExecutable(false) - , AreInputsExecutable(false) {} -}; - -using TExprToExprListMap = THashMap<const NYql::TExprNode*, TVector<NYql::NNodes::TExprBase>>; -using TExprToNodeInfoMap = THashMap<const NYql::TExprNode*, TNodeInfo>; - -struct TScopedNode { - NYql::NNodes::TExprBase Node; - TMaybe<TExprScope> Scope; - - TScopedNode(NYql::NNodes::TExprBase node, TMaybe<TExprScope> scope = {}) - : Node(node) - , Scope(scope) {} -}; - -struct TKqpAnalyzeResults { - bool CanExecute; - TVector<TScopedNode> ExecutionRoots; - TExprToNodeInfoMap ExprToNodeInfoMap; - TExprToExprListMap LambdaToExecRootsMap; - TExprToExprListMap CallableToExecRootsMap; -}; - -using TMkqlResult = NKikimrMiniKQL::TResult; - -struct TMkqlExecuteResult { - TString Program; - NThreading::TFuture<IKqpGateway::TMkqlResult> Future; - - TMkqlExecuteResult(const TString& program, const NThreading::TFuture<IKqpGateway::TMkqlResult>& future) - : Program(program) - , Future(future) {} - - TMkqlExecuteResult(const NThreading::TFuture<IKqpGateway::TMkqlResult>& future) - : Program() - , Future(future) {} - - TMkqlExecuteResult() {} -}; - struct TKqlTransformContext : TThrRefBase { TKqlTransformContext(NYql::TKikimrConfiguration::TPtr& config, TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, TIntrusivePtr<NYql::TKikimrTablesData> tables) @@ -87,49 +19,17 @@ struct TKqlTransformContext : TThrRefBase { NKikimrKqp::TKqlSettings Settings; NActors::TActorId ReplyTarget; - TKqpAnalyzeResults AnalyzeResults; - NKikimrKqp::TPreparedKql* PreparingKql = nullptr; - const NKikimrKqp::TPreparedKql* PreparedKql; NKqpProto::TKqpStatsQuery QueryStats; std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery; TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults; TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults; - ECommitSafety CommitSafety() const { - auto safetyValue = Config->CommitSafety.Get().GetRef(); - if (safetyValue == "Full") { - return ECommitSafety::Full; - } else if (safetyValue == "Safe") { - return ECommitSafety::Safe; - } else if (safetyValue == "Moderate") { - return ECommitSafety::Moderate; - } - - YQL_ENSURE(false, "Unexpected value for CommitSafety."); - } - - NKikimrKqp::TPreparedKql& GetPreparingKql() { - YQL_ENSURE(PreparingKql); - return *PreparingKql; - } - - const NKikimrKqp::TPreparedKql& GetPreparedKql() { - YQL_ENSURE(PreparedKql); - return *PreparedKql; - } - - IKqpGateway::TMkqlSettings GetMkqlSettings(bool hasDataEffects, TInstant now) const; - void AddMkqlStats(const TString& program, NKikimrQueryStats::TTxStats&& txStats); - void Reset() { Settings = {}; ReplyTarget = {}; - AnalyzeResults = {}; MkqlResults.clear(); QueryStats = {}; - PreparingKql = nullptr; - PreparedKql = nullptr; PhysicalQuery = nullptr; PhysicalQueryResults.clear(); } @@ -154,7 +54,6 @@ bool AddDeferredEffect(NYql::NNodes::TExprBase effect, const TVector<NKikimrKqp: bool AddDeferredEffect(NYql::NNodes::TExprBase effect, NYql::TExprContext& ctx, TKqpTransactionState& txState, TKqlTransformContext& transformCtx, bool preserveParamValues); -NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock); bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx, NYql::TExprContext& ctx); @@ -162,31 +61,10 @@ bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx); -bool UnpackMergeLocks(const NKikimrMiniKQL::TResult& result, TKqpTransactionContext& txCtx, NYql::TExprContext& ctx); - -TKqpParamsMap BuildParamsMap(const TVector<NKikimrKqp::TParameterBinding>& bindings, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool acquireLocks); - -TVector<NKikimrKqp::TParameterBinding> CollectParams(NYql::NNodes::TExprBase query); - -NKikimrMiniKQL::TParams GetLocksParamValue(const TKqpTxLocks& locks); - -TAutoPtr<NYql::IGraphTransformer> CreateKqpSimplifyTransformer(); -TAutoPtr<NYql::IGraphTransformer> CreateKqpAnalyzeTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx); -TAutoPtr<NYql::IGraphTransformer> CreateKqpRewriteTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx); -TAutoPtr<NYql::IGraphTransformer> CreateKqpExecTransformer(TIntrusivePtr<IKqpGateway> gateway, - const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx); -TAutoPtr<NYql::IGraphTransformer> CreateKqpSubstituteTransformer(TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx); -TAutoPtr<NYql::IGraphTransformer> CreateKqpFinalizeTransformer(TIntrusivePtr<IKqpGateway> gateway, - const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx); - TAutoPtr<NYql::IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster, TIntrusivePtr<NYql::TKikimrTablesData> tablesData, NYql::TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config); -TAutoPtr<NYql::IGraphTransformer> CreateKqpCheckKiProgramTransformer(); + TAutoPtr<NYql::IGraphTransformer> CreateKqpCheckQueryTransformer(); } // namespace NKqp diff --git a/ydb/core/kqp/prepare/kqp_prepare_impl.h b/ydb/core/kqp/prepare/kqp_prepare_impl.h deleted file mode 100644 index 1d6e4c5064a..00000000000 --- a/ydb/core/kqp/prepare/kqp_prepare_impl.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once - -#include "kqp_prepare.h" - -namespace NKikimr { -namespace NKqp { - -enum class ETableReadType { - Unspecified, - FullScan, - Scan, - Lookup, - MultiLookup, -}; - -enum class ETableWriteType { - Unspecified, - Upsert, - MultiUpsert, - Erase, - MultiErase, -}; - -bool HasEffects(const NYql::NNodes::TKiProgram& program); -bool HasResults(const NYql::NNodes::TKiProgram& program); - -NYql::NNodes::TCoList GetEmptyEffectsList(NYql::TPositionHandle pos, NYql::TExprContext& ctx); - -TMkqlExecuteResult ExecuteMkql(NYql::NNodes::TKiProgram program, TIntrusivePtr<IKqpGateway> gateway, - const TString& cluster, NYql::TExprContext& ctx, TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx, bool hasDataEffects); - -void LogMkqlResult(const NKikimrMiniKQL::TResult& result, NYql::TExprContext& ctx); - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_query_analyze.cpp b/ydb/core/kqp/prepare/kqp_query_analyze.cpp deleted file mode 100644 index b37e4eb0582..00000000000 --- a/ydb/core/kqp/prepare/kqp_query_analyze.cpp +++ /dev/null @@ -1,566 +0,0 @@ -#include "kqp_prepare_impl.h" - -#include <ydb/library/yql/core/yql_expr_optimize.h> -#include <ydb/library/yql/core/yql_expr_type_annotation.h> -#include <ydb/library/yql/utils/log/log.h> - -#include <util/generic/queue.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; - -namespace { - -const THashSet<TStringBuf> SafeCallables { - TCoJust::CallableName(), - TCoCoalesce::CallableName(), - TCoToOptional::CallableName(), - TCoHead::CallableName(), - TCoLast::CallableName(), - TCoToList::CallableName(), - - TCoMember::CallableName(), - TCoAsStruct::CallableName(), - - TCoNothing::CallableName(), - TCoNull::CallableName(), - TCoDefault::CallableName(), - TCoExists::CallableName(), - - TCoNth::CallableName() -}; - -const THashSet<TStringBuf> ModerateCallables { - TCoAddMember::CallableName(), - TCoReplaceMember::CallableName(), -}; - -bool IsSafePayloadCallable(const TCallable& callable) { - if (callable.Maybe<TCoDataCtor>()) { - return true; - } - - if (callable.Maybe<TCoCompare>()) { - return true; - } - - if (callable.Maybe<TCoAnd>()) { - return true; - } - - if (callable.Maybe<TCoOr>()) { - return true; - } - - if (callable.Maybe<TCoBinaryArithmetic>()) { - return true; - } - - if (callable.Maybe<TCoCountBase>()) { - return true; - } - - auto isOptInput = [](TExprBase input) { - return input.Maybe<TCoToList>() || input.Maybe<TCoTake>().Input().Maybe<TCoToList>(); - }; - - if (auto maybeFilter = callable.Maybe<TCoFilterBase>()) { - auto filter = maybeFilter.Cast(); - - if (filter.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) { - return true; - } - - if (isOptInput(filter.Input())) { - return true; - } - - return false; - } - - if (auto maybeMap = callable.Maybe<TCoMapBase>()) { - auto map = maybeMap.Cast(); - - if (map.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) { - return true; - } - - if (isOptInput(map.Input())) { - if (maybeMap.Maybe<TCoMap>()) { - return true; - } else { - auto body = map.Lambda().Body(); - - return body.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional || - body.Maybe<TCoToList>() || body.Maybe<TCoAsList>(); - } - } - - return false; - } - - return SafeCallables.contains(callable.CallableName()); -} - -bool IsModeratePayloadCallable(TCoNameValueTupleList key, const TCallable& callable) { - if (IsSafePayloadCallable(callable)) { - return true; - } - - if (auto selectRow = callable.Maybe<TKiSelectRow>()) { - return selectRow.Cast().Key().Raw() == key.Raw(); - } - - return ModerateCallables.contains(callable.CallableName()); -} - -struct TAnalyzeTxContext { - TExprToNodeInfoMap NodesInfo; - TVector<TScopedNode> ExecutionRoots; -}; - -void GatherNodeScopes(TExprBase root, TAnalyzeTxContext& analyzeCtx) { - TNodeSet visitedNodes; - - TQueue<TMaybe<TExprScope>> scopesQueue; - scopesQueue.push(TMaybe<TExprScope>()); - - while (!scopesQueue.empty()) { - auto scope = scopesQueue.front(); - scopesQueue.pop(); - - auto scopeRoot = scope - ? scope->Lambda.Body() - : root; - - VisitExpr(scopeRoot.Ptr(), [scope, &scopesQueue, &analyzeCtx] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - - if (node.Maybe<TCoLambda>()) { - return false; - } - - if (auto callable = node.Maybe<TCallable>()) { - auto scopeDepth = scope ? scope->Depth : 0; - - for (const auto& arg : node.Cast<TVarArgCallable<TExprBase>>()) { - if (auto lambda = arg.Maybe<TCoLambda>()) { - TExprScope newScope(callable.Cast(), lambda.Cast(), scopeDepth + 1); - scopesQueue.push(newScope); - - for (const auto& lambdaArg : lambda.Cast().Args()) { - analyzeCtx.NodesInfo[lambdaArg.Raw()].Scope = newScope; - } - } - } - } - - return true; - }, visitedNodes); - }; - - - VisitExpr(root.Ptr(), - [] (const TExprNode::TPtr& exprNode) { - Y_UNUSED(exprNode); - return true; - }, - [&analyzeCtx] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - - auto& nodeInfo = analyzeCtx.NodesInfo[node.Raw()]; - if (!node.Maybe<TCoArgument>()) { - YQL_ENSURE(!nodeInfo.Scope); - } - - TMaybe<TExprScope> scope = nodeInfo.Scope; - for (const auto& child : exprNode->Children()) { - if (TMaybeNode<TCoLambda>(child)) { - continue; - } - - auto childScope = analyzeCtx.NodesInfo[child.Get()].Scope; - if (childScope) { - if (scope) { - scope = childScope->Depth > scope->Depth - ? childScope - : scope; - } else { - scope = childScope; - } - } - } - - analyzeCtx.NodesInfo[node.Raw()].Scope = scope; - return true; - }); -} - -void RequireImmediate(TExprBase node, TAnalyzeTxContext& ctx) { - ctx.NodesInfo[node.Raw()].RequireImmediate = true; -} - -void RequireImmediateKey(TCoNameValueTupleList key, TAnalyzeTxContext& ctx) { - for (auto tuple : key) { - YQL_ENSURE(tuple.Value().IsValid()); - RequireImmediate(tuple.Value().Cast(), ctx); - } -} - -void RequireImmediateRange(TExprList range, TAnalyzeTxContext& ctx) { - for (auto tuple : range) { - if (auto columnRange = tuple.Maybe<TKiColumnRangeTuple>()) { - RequireImmediate(columnRange.Cast().From(), ctx); - RequireImmediate(columnRange.Cast().To(), ctx); - } - } -} - -void RequireImmediateSettings(TCoNameValueTupleList settings, TAnalyzeTxContext& ctx) { - for (auto setting : settings) { - if (setting.Value() && setting.Value().Ref().IsComputable()) { - RequireImmediate(setting.Value().Cast(), ctx); - } - } -} - -void RequireEffectPayloadSafety(TCoNameValueTupleList key, TCoNameValueTupleList payload, TAnalyzeTxContext& ctx, - ECommitSafety commitSafety) -{ - switch (commitSafety) { - case ECommitSafety::Full: - for (auto tuple : payload) { - YQL_ENSURE(tuple.Value().IsValid()); - RequireImmediate(tuple.Value().Cast(), ctx); - } - return; - - case ECommitSafety::Safe: - case ECommitSafety::Moderate: - break; - - default: - YQL_ENSURE(false, "Unexpected commit safety level."); - } - - VisitExpr(payload.Ptr(), [&ctx, key, commitSafety] (const TExprNode::TPtr& exprNode) { - TExprBase node(exprNode); - - if (node.Maybe<TCoLambda>()) { - return false; - } - - if (node.Maybe<TCoArgument>()) { - return false; - } - - if (!node.Ref().IsComputable()) { - return true; - } - - if (ctx.NodesInfo[node.Raw()].IsImmediate) { - return false; - } - - if (auto maybeList = node.Maybe<TExprList>()) { - return true; - } - - if (auto maybeCallable = node.Maybe<TCallable>()) { - auto callable = maybeCallable.Cast(); - - bool safeCallable = commitSafety == ECommitSafety::Safe - ? IsSafePayloadCallable(callable) - : IsModeratePayloadCallable(key, callable); - - if (!safeCallable) { - RequireImmediate(callable, ctx); - return false; - } - - for (const auto& arg : callable.Cast<TVarArgCallable<TExprBase>>()) { - if (auto lambda = arg.Maybe<TCoLambda>()) { - auto badNode = FindNode(lambda.Cast().Body().Ptr(), - [key, commitSafety] (const TExprNode::TPtr& node) { - if (!node->IsCallable()) { - return false; - } - - auto callable = TCallable(node); - bool safeCallable = commitSafety == ECommitSafety::Safe - ? IsSafePayloadCallable(callable) - : IsModeratePayloadCallable(key, callable); - - return !safeCallable; - }); - - if (badNode) { - RequireImmediate(callable, ctx); - return false; - } - } - } - - return true; - } - - RequireImmediate(node, ctx); - return false; - }); -} - -void MarkImmediateNodes(TExprBase node, TAnalyzeTxContext& ctx) { - if (node.Maybe<TCoDataType>() || - node.Maybe<TCoOptionalType>()) - { - ctx.NodesInfo[node.Raw()].IsImmediate = true; - } - - if (node.Maybe<TCoDataCtor>() || node.Maybe<TCoVoid>() || node.Maybe<TCoNothing>() || node.Maybe<TCoNull>()) { - ctx.NodesInfo[node.Raw()].IsImmediate = true; - } - - if (node.Maybe<TCoParameter>()) { - ctx.NodesInfo[node.Raw()].IsImmediate = true; - } - - if (node.Maybe<TKiMapParameter>() || node.Maybe<TKiFlatMapParameter>()) { - auto mapLambda = node.Maybe<TKiMapParameter>() - ? node.Cast<TKiMapParameter>().Lambda() - : node.Cast<TKiFlatMapParameter>().Lambda(); - - ctx.NodesInfo[mapLambda.Args().Arg(0).Raw()].IsImmediate = true; - } -} - -void MarkRequireImmediateNodes(TExprBase node, TAnalyzeTxContext& ctx) { - if (auto selectRow = node.Maybe<TKiSelectRow>()) { - RequireImmediateKey(selectRow.Cast().Key(), ctx); - } - - if (auto selectRange = node.Maybe<TKiSelectRange>()) { - RequireImmediateRange(selectRange.Cast().Range(), ctx); - RequireImmediateSettings(selectRange.Cast().Settings(), ctx); - } - - if (auto updateRow = node.Maybe<TKiUpdateRow>()) { - RequireImmediateKey(updateRow.Cast().Key(), ctx); - } - - if (auto eraseRow = node.Maybe<TKiEraseRow>()) { - RequireImmediateKey(eraseRow.Cast().Key(), ctx); - } - - if (node.Maybe<TKiMapParameter>() || node.Maybe<TKiFlatMapParameter>()) { - TExprBase input = node.Maybe<TKiMapParameter>() - ? node.Cast<TKiMapParameter>().Input() - : node.Cast<TKiFlatMapParameter>().Input(); - - ctx.NodesInfo[input.Raw()].RequireImmediate = true; - } - - if (auto condEffect = node.Maybe<TKiConditionalEffect>()) { - ctx.NodesInfo[condEffect.Cast().Predicate().Raw()].RequireImmediate = true; - } -} - -void PropagateImmediateNodes(TExprBase node, TAnalyzeTxContext& ctx) { - if (auto just = node.Maybe<TCoJust>()) { - if (ctx.NodesInfo[just.Cast().Input().Raw()].IsImmediate) { - ctx.NodesInfo[node.Raw()].IsImmediate = true; - } - } - - if (auto nth = node.Maybe<TCoNth>()) { - if (ctx.NodesInfo[nth.Cast().Tuple().Raw()].IsImmediate) { - ctx.NodesInfo[node.Raw()].IsImmediate = true; - } - } - - if (auto member = node.Maybe<TCoMember>()) { - if (ctx.NodesInfo[member.Cast().Struct().Raw()].IsImmediate) { - ctx.NodesInfo[node.Raw()].IsImmediate = true; - } - } -} - -void EnsureNodesSafety(TExprBase node, TAnalyzeTxContext& ctx, ECommitSafety commitSafety, bool topLevel) { - if (auto updateRow = node.Maybe<TKiUpdateRow>()) { - RequireEffectPayloadSafety(updateRow.Cast().Key(), updateRow.Cast().Update(), ctx, - topLevel ? commitSafety : ECommitSafety::Full); - } -} - -bool EnsureEffectsSafety(TExprBase node, TAnalyzeTxContext& ctx) { - bool hasNewRequirements = false; - - VisitExpr(node.Ptr(), [&ctx, &hasNewRequirements] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - auto& nodeInfo = ctx.NodesInfo[node.Raw()]; - - if (nodeInfo.IsImmediate || nodeInfo.RequireImmediate) { - return false; - } - - if (node.Maybe<TKiUpdateRow>() || node.Maybe<TKiEraseRow>()) { - return false; - } - - if (node.Maybe<TKiSelectRow>() || node.Maybe<TKiSelectRange>()) { - hasNewRequirements = true; - RequireImmediate(node, ctx); - return false; - } - - return true; - }); - - return !hasNewRequirements; -} - -void AnalyzeNode(TExprBase node, TAnalyzeTxContext& ctx) { - auto& nodeInfo = *ctx.NodesInfo.FindPtr(node.Raw()); - - nodeInfo.IsExecutable = true; - nodeInfo.AreInputsExecutable = true; - - for (const auto& child : node.Ptr()->Children()) { - const auto& childInfo = *ctx.NodesInfo.FindPtr(child.Get()); - - bool canExecute = childInfo.IsExecutable; - if (canExecute) { - if (childInfo.RequireImmediate && !childInfo.IsImmediate) { - ctx.ExecutionRoots.emplace_back(TExprBase(child), childInfo.Scope); - canExecute = false; - } - } - - nodeInfo.IsExecutable = nodeInfo.IsExecutable && canExecute; - - if (!TMaybeNode<TCoLambda>(child)) { - nodeInfo.AreInputsExecutable = nodeInfo.AreInputsExecutable && canExecute; - } - - if (!nodeInfo.IsExecutable && !nodeInfo.AreInputsExecutable) { - break; - } - } -} - -void AnalyzeNodes(const TVector<TExprBase>& nodes, TAnalyzeTxContext& ctx) { - YQL_ENSURE(ctx.ExecutionRoots.empty()); - - for (auto& node : nodes) { - AnalyzeNode(node, ctx); - } -} - -bool Analyze(const TExprNode::TPtr& exprRoot, TKqpAnalyzeResults& results, ECommitSafety commitSafety) { - TAnalyzeTxContext analyzeCtx; - TVector<TExprBase> nodes; - - GatherNodeScopes(TExprBase(exprRoot), analyzeCtx); - - VisitExpr(exprRoot, - [&analyzeCtx] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - - MarkImmediateNodes(node, analyzeCtx); - MarkRequireImmediateNodes(node, analyzeCtx); - return true; - }, - [&analyzeCtx, &nodes, commitSafety] (const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - auto scope = analyzeCtx.NodesInfo[exprNode.Get()].Scope; - - PropagateImmediateNodes(node, analyzeCtx); - EnsureNodesSafety(node, analyzeCtx, commitSafety, !scope); - - nodes.push_back(node); - return true; - }); - - AnalyzeNodes(nodes, analyzeCtx); - - auto& rootInfo = analyzeCtx.NodesInfo[exprRoot.Get()]; - - if (rootInfo.IsExecutable) { - if (!EnsureEffectsSafety(TKiProgram(exprRoot).Effects(), analyzeCtx)) { - AnalyzeNodes(nodes, analyzeCtx); - } - } - - results.CanExecute = rootInfo.IsExecutable; - results.ExprToNodeInfoMap = std::move(analyzeCtx.NodesInfo); - - results.ExecutionRoots.clear(); - results.CallableToExecRootsMap.clear(); - results.LambdaToExecRootsMap.clear(); - for (auto& execRoot : analyzeCtx.ExecutionRoots) { - if (execRoot.Scope) { - auto callableInfo = results.ExprToNodeInfoMap.FindPtr(execRoot.Scope->Callable.Raw()); - YQL_ENSURE(callableInfo); - - if (!callableInfo->AreInputsExecutable) { - continue; - } - - results.LambdaToExecRootsMap[execRoot.Scope->Lambda.Raw()].push_back(execRoot.Node); - results.CallableToExecRootsMap[execRoot.Scope->Callable.Raw()].push_back(execRoot.Node); - } - - results.ExecutionRoots.push_back(execRoot); - } - - YQL_ENSURE(results.CanExecute || !results.ExecutionRoots.empty()); - - return true; -} - -class TKqpAnalyzeTransformer : public TSyncTransformerBase { -public: - TKqpAnalyzeTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx) - : TransformCtx(transformCtx) {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - Y_UNUSED(ctx); - output = input; - - auto commitSafety = TransformCtx->CommitSafety(); - if (!TransformCtx->Config->HasAllowKqpUnsafeCommit()) { - switch (commitSafety) { - case ECommitSafety::Full: - case ECommitSafety::Safe: - break; - default: - ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::KIKIMR_BAD_OPERATION, - "Unsafe commits not allowed for current database.")); - return TStatus::Error; - } - } - - if (!Analyze(input.Get(), TransformCtx->AnalyzeResults, commitSafety)) { - return TStatus::Error; - } - - YQL_CLOG(DEBUG, ProviderKqp) << "Analyze results:" << Endl - << "CanExecute: " << TransformCtx->AnalyzeResults.CanExecute << Endl - << "ExecutionRoots: " << TransformCtx->AnalyzeResults.ExecutionRoots.size(); - return TStatus::Ok; - } - -private: - TIntrusivePtr<TKqlTransformContext> TransformCtx; -}; - -} // namespace - -TAutoPtr<IGraphTransformer> CreateKqpAnalyzeTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx) { - return new TKqpAnalyzeTransformer(transformCtx); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_query_exec.cpp b/ydb/core/kqp/prepare/kqp_query_exec.cpp index bc2d4e94ce9..bc71e4ec0b3 100644 --- a/ydb/core/kqp/prepare/kqp_query_exec.cpp +++ b/ydb/core/kqp/prepare/kqp_query_exec.cpp @@ -1,4 +1,4 @@ -#include "kqp_prepare_impl.h" +#include "kqp_prepare.h" #include <ydb/core/engine/mkql_engine_flat.h> #include <ydb/core/kqp/common/kqp_yql.h> @@ -17,671 +17,6 @@ using namespace NYql::NNodes; using namespace NYql::NCommon; using namespace NThreading; -namespace { - -struct TTransformState { - TTransformState(TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) - : ParamsState(txState->Tx().ParamsState) - , TransformCtx(transformCtx) {} - - TIntrusivePtr<TKqpTransactionContext::TParamsState> ParamsState; - TIntrusivePtr<TKqlTransformContext> TransformCtx; -}; - -struct TParamBinding { - TString Name; - TMaybe<ui32> ResultIndex; - - TParamBinding(const TString& name, TMaybe<ui32> resultIndex = {}) - : Name(name) - , ResultIndex(resultIndex) {} -}; - -NKikimrKqp::TParameterBinding GetParameterBinding(TCoParameter param) { - auto name = TString(param.Name().Value()); - - NKikimrKqp::TParameterBinding binding; - binding.SetName(name); - if (param.Ref().HasResult()) { - auto indexTuple = TCoAtomList(TExprNode::GetResult(param.Ptr())); - ui32 mkqlIndex = FromString<ui32>(indexTuple.Item(0).Value()); - ui32 resultIndex = FromString<ui32>(indexTuple.Item(1).Value()); - binding.SetMkqlIndex(mkqlIndex); - binding.SetResultIndex(resultIndex); - } - - return binding; -} - -NDq::TMkqlValueRef GetParamFromResult(const NKikimrKqp::TParameterBinding& binding, - const TKqlTransformContext& transformCtx) -{ - YQL_ENSURE(binding.HasMkqlIndex()); - YQL_ENSURE(binding.HasResultIndex()); - YQL_ENSURE(binding.GetMkqlIndex() < transformCtx.MkqlResults.size()); - - const NKikimrMiniKQL::TType* type; - const NKikimrMiniKQL::TValue* value; - GetKikimrUnpackedRunResult(*transformCtx.MkqlResults[binding.GetMkqlIndex()], binding.GetResultIndex(), - type, value); - YQL_ENSURE(type); - YQL_ENSURE(value); - - return NDq::TMkqlValueRef(*type, *value); -} - -NKikimrMiniKQL::TParams BuildParamFromResult(const NKikimrKqp::TParameterBinding& binding, - const TKqlTransformContext& transformCtx) -{ - auto valueRef = GetParamFromResult(binding, transformCtx); - - NKikimrMiniKQL::TParams param; - param.MutableType()->CopyFrom(valueRef.GetType()); - param.MutableValue()->CopyFrom(valueRef.GetValue()); - return param; -} - -bool GetPredicateValue(const TKiConditionalEffect& effect, const TKqlTransformContext& transformCtx, - const THashMap<TString, NKikimrKqp::TParameterBinding>& bindingsMap) -{ - YQL_ENSURE(effect.Predicate().Maybe<TCoParameter>()); - auto paramName = effect.Predicate().Cast<TCoParameter>().Name().Value(); - - auto binding = bindingsMap.FindPtr(paramName); - YQL_ENSURE(binding); - YQL_ENSURE(binding->HasMkqlIndex()); - - auto paramValue = GetParamFromResult(*binding, transformCtx); - YQL_ENSURE(paramValue.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::Data); - YQL_ENSURE(paramValue.GetType().GetData().GetScheme() == NKikimr::NScheme::NTypeIds::Bool); - return paramValue.GetValue().GetBool(); -} - -bool ProcessEffect(TExprBase& effect, const THashMap<TString, NKikimrKqp::TParameterBinding>& bindingsMap, - const TKqlTransformContext& transformCtx, TExprContext& ctx) -{ - TOptimizeExprSettings optSettings(nullptr); - optSettings.VisitChanges = true; - TExprNode::TPtr output; - auto status = OptimizeExpr(effect.Ptr(), output, - [&bindingsMap, &transformCtx](const TExprNode::TPtr& input, TExprContext& ctx) { - TExprBase node(input); - - if (auto maybeRevert = node.Maybe<TKiRevertIf>()) { - auto revert = maybeRevert.Cast(); - auto predicateValue = GetPredicateValue(revert, transformCtx, bindingsMap); - - if (predicateValue) { - ctx.AddWarning(YqlIssue(ctx.GetPosition(revert.Pos()), TIssuesIds::KIKIMR_OPERATION_REVERTED, TStringBuilder() - << "Operation reverted due to constraint violation: " << revert.Constraint().Value())); - - return GetEmptyEffectsList(revert.Pos(), ctx).Ptr(); - } - - return revert.Effect().Ptr(); - } - - if (auto maybeAbort = node.Maybe<TKiAbortIf>()) { - auto abort = maybeAbort.Cast(); - auto predicateValue = GetPredicateValue(abort, transformCtx, bindingsMap); - - if (predicateValue) { - ctx.AddError(YqlIssue(ctx.GetPosition(abort.Pos()), TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, TStringBuilder() - << "Operation aborted due to constraint violation: " << abort.Constraint().Value())); - - return TExprNode::TPtr(); - } - - return abort.Effect().Ptr(); - } - - return input; - }, ctx, optSettings); - - if (status == IGraphTransformer::TStatus::Ok) { - effect = TExprBase(output); - return true; - } - - YQL_ENSURE(status == IGraphTransformer::TStatus::Error); - return false; -} - -NNodes::TExprBase PreserveParams(NNodes::TExprBase node, - const THashMap<TString, NKikimrKqp::TParameterBinding>& bindingsMap, TExprContext& ctx, - TKqpTransactionState& txState, const TKqlTransformContext& transformCtx, bool preserveValues) -{ - TNodeOnNodeOwnedMap replaceMap; - THashMap<TString, TExprBase> resultsMap; - VisitExpr(node.Ptr(), [&replaceMap, &resultsMap, &ctx, &bindingsMap, &txState, &transformCtx, preserveValues] (const TExprNode::TPtr& node) { - if (auto maybeParam = TMaybeNode<TCoParameter>(node)) { - auto param = maybeParam.Cast(); - auto name = TString(param.Name().Value()); - - auto bindingPtr = bindingsMap.FindPtr(name); - YQL_ENSURE(bindingPtr); - - auto& binding = *bindingPtr; - - TMaybe<NKikimrMiniKQL::TParams> paramValue; - TMaybeNode<TExprBase> resultNode; - if (preserveValues) { - if (binding.HasMkqlIndex()) { - paramValue = BuildParamFromResult(binding, transformCtx); - } else { - if (!txState.Tx().ParamsState->Values.contains(binding.GetName())) { - auto clientParam = transformCtx.QueryCtx->Parameters.FindPtr(binding.GetName()); - YQL_ENSURE(clientParam); - paramValue = *clientParam; - } - } - } else { - if (!param.Ref().HasResult() && binding.HasMkqlIndex()) { - resultNode = Build<TCoAtomList>(ctx, param.Pos()) - .Add().Build(ToString(binding.GetMkqlIndex())) - .Add().Build(ToString(binding.GetResultIndex())) - .Done(); - } - } - - if (!paramValue && !resultNode) { - return true; - } - - auto newParamName = txState.Tx().NewParamName(); - - auto newParamNode = Build<TCoParameter>(ctx, node->Pos()) - .Name().Build(newParamName) - .Type(param.Type()) - .Done(); - - replaceMap.emplace(node.Get(), newParamNode.Ptr()); - - if (paramValue) { - YQL_ENSURE(txState.Tx().ParamsState->Values.emplace(std::make_pair(newParamName, *paramValue)).second); - } - - if (resultNode) { - YQL_ENSURE(resultsMap.emplace(std::make_pair(newParamName, resultNode.Cast())).second); - } - } - - return true; - }); - - auto newNode = TExprBase(ctx.ReplaceNodes(node.Ptr(), replaceMap)); - - if (!resultsMap.empty()) { - VisitExpr(newNode.Ptr(), [&resultsMap] (const TExprNode::TPtr& node) { - if (auto maybeParam = TMaybeNode<TCoParameter>(node)) { - auto param = maybeParam.Cast(); - auto name = TString(param.Name().Value()); - - auto result = resultsMap.FindPtr(name); - if (result) { - param.Ptr()->SetResult(result->Ptr()); - } - } - - return true; - }); - } - - return newNode; -} - -class TKqpExecTransformer : public TGraphTransformerBase { -public: - TKqpExecTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) - : Gateway(gateway) - , Cluster(cluster) - , TxState(txState) - , TransformCtx(transformCtx) {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - const auto& analyzeResults = TransformCtx->AnalyzeResults; - - TVector<TExprBase> results; - for (auto& root : analyzeResults.ExecutionRoots) { - if (root.Scope) { - ctx.AddError(TIssue(ctx.GetPosition(root.Node.Pos()), TStringBuilder() - << "Unexpected nested execution roots in rewritten program.")); - return TStatus::Error; - } - - results.push_back(TExprBase(root.Node)); - } - - TMaybeNode<TExprList> resultsNode; - if (analyzeResults.CanExecute) { - auto kqpProgram = TKiProgram(input); - - if (HasEffects(kqpProgram)) { - bool preserveParams = !TransformCtx->Settings.GetCommitTx(); - if (!AddDeferredEffect(kqpProgram.Effects(), ctx, *TxState, *TransformCtx, preserveParams)) { - return TStatus::Error; - } - } - - if (!HasResults(kqpProgram)) { - TransformCtx->MkqlResults.push_back(nullptr); - return TStatus::Ok; - } - - if (TransformCtx->Settings.GetCommitTx() && !TransformCtx->QueryCtx->PrepareOnly) { - if (TxState->Tx().DeferredEffects.Empty()) { - // Merge read-only query with commit tx - return TStatus::Ok; - } - } - - resultsNode = kqpProgram.Results(); - } else { - YQL_ENSURE(!results.empty()); - resultsNode = Build<TExprList>(ctx, input->Pos()) - .Add(results) - .Done(); - } - - auto effectsNode = Build<TCoAsList>(ctx, input->Pos()) - .Add<TCoIf>() - .Predicate<TCoParameter>() - .Name().Build(LocksAcquireParamName) - .Type<TCoDataType>() - .Type().Build("Bool") - .Build() - .Build() - .ThenValue<TKiAcquireLocks>() - .LockTxId<TCoParameter>() - .Name().Build(LocksTxIdParamName) - .Type<TCoDataType>() - .Type().Build("Uint64") - .Build() - .Build() - .Build() - .ElseValue<TCoVoid>() - .Build() - .Build() - .Done(); - - YQL_ENSURE(resultsNode); - auto program = Build<TKiProgram>(ctx, input->Pos()) - .Results(resultsNode.Cast()) - .Effects(effectsNode) - .Done(); - - Promise = NewPromise(); - MkqlExecuteResult = ExecuteMkql(program, Gateway, Cluster, ctx, TxState, TransformCtx, false); - - auto promise = Promise; - MkqlExecuteResult.Future.Apply([promise](const TFuture<IKqpGateway::TMkqlResult> future) mutable { - YQL_ENSURE(future.HasValue()); - promise.SetValue(); - }); - - return TStatus::Async; - } - - TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { - Y_UNUSED(input); - return Promise.GetFuture(); - } - - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - NKikimr::NKqp::IKqpGateway::TMkqlResult result(MkqlExecuteResult.Future.ExtractValue()); - - result.ReportIssues(ctx.IssueManager); - if (!result.Success()) { - return TStatus::Error; - } - - auto mkqlResult = MakeSimpleShared<NKikimrMiniKQL::TResult>(); - mkqlResult->Swap(&result.Result); - - TransformCtx->MkqlResults.push_back(mkqlResult); - - if (TransformCtx->QueryCtx->PrepareOnly) { - YQL_ENSURE(!TransformCtx->GetPreparingKql().GetMkqls().empty()); - auto& mkql = *TransformCtx->GetPreparingKql().MutableMkqls()->rbegin(); - mkql.SetProgram(result.CompiledProgram); - mkql.SetProgramText(MkqlExecuteResult.Program); - } else { - LogMkqlResult(*mkqlResult, ctx); - TransformCtx->AddMkqlStats(MkqlExecuteResult.Program, std::move(result.TxStats)); - - if (TxState->Tx().EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { - if (!UnpackMergeLocks(*mkqlResult, TxState->Tx(), ctx)) { - return TStatus::Error; - } - } - } - - if (TransformCtx->AnalyzeResults.CanExecute) { - output = Build<TKiProgram>(ctx, input->Pos()) - .Results() - .Build() - .Effects(TKiProgram(input).Effects()) - .Done() - .Ptr(); - } - - return TStatus::Ok; - } - - void Rewind() override {} - -private: - TIntrusivePtr<IKqpGateway> Gateway; - TString Cluster; - TIntrusivePtr<TKqpTransactionState> TxState; - TIntrusivePtr<TKqlTransformContext> TransformCtx; - TMkqlExecuteResult MkqlExecuteResult; - TPromise<void> Promise; -}; - -void ExtractQueryStats(NKqpProto::TKqpStatsQuery& dst, const NKikimrQueryStats::TTxStats& txStats) { - auto& dstExec = *dst.AddExecutions(); - NKqpProto::TKqpExecutionExtraStats executionExtraStats; - - dstExec.SetDurationUs(txStats.GetDurationUs()); - dstExec.SetCpuTimeUs(txStats.GetComputeCpuTimeUsec()); - - auto& dstComputeTime = *executionExtraStats.MutableComputeCpuTimeUs(); - dstComputeTime.SetMin(txStats.GetComputeCpuTimeUsec()); - dstComputeTime.SetMax(txStats.GetComputeCpuTimeUsec()); - dstComputeTime.SetSum(txStats.GetComputeCpuTimeUsec()); - dstComputeTime.SetCnt(1); - - { - i64 cnt = 0; - ui64 minCpu = Max<ui64>(); - ui64 maxCpu = 0; - ui64 sumCpu = 0; - ui64 sumReadSets = 0; - ui64 maxProgramSize = 0; - ui64 maxReplySize = 0; - for (const auto& perShard : txStats.GetPerShardStats()) { - ui64 cpu = perShard.GetCpuTimeUsec(); - minCpu = Min(minCpu, cpu); - maxCpu = Max(maxCpu, cpu); - sumCpu += cpu; - sumReadSets += perShard.GetOutgoingReadSetsCount(); - maxProgramSize = Max(maxProgramSize, perShard.GetProgramSize()); - maxReplySize = Max(maxReplySize, perShard.GetReplySize()); - ++cnt; - } - if (cnt) { - auto& dstShardTime = *executionExtraStats.MutableShardsCpuTimeUs(); - dstShardTime.SetMin(minCpu); - dstShardTime.SetMax(maxCpu); - dstShardTime.SetSum(sumCpu); - dstShardTime.SetCnt(cnt); - - dst.SetReadSetsCount(dst.GetReadSetsCount() + sumReadSets); - dst.SetMaxShardProgramSize(Max(dst.GetMaxShardProgramSize(), maxProgramSize)); - dst.SetMaxShardReplySize(Max(dst.GetMaxShardReplySize(), maxReplySize)); - - dstExec.SetCpuTimeUs(dstExec.GetCpuTimeUs() + sumCpu); - } - } - - ui32 affectedShards = 0; - for (auto& table : txStats.GetTableAccessStats()) { - auto& dstTable = *dstExec.AddTables(); - dstTable.SetTablePath(table.GetTableInfo().GetName()); - dstTable.SetReadRows(table.GetSelectRow().GetRows() + table.GetSelectRange().GetRows()); - dstTable.SetReadBytes(table.GetSelectRow().GetBytes() + table.GetSelectRange().GetBytes()); - dstTable.SetWriteRows(table.GetUpdateRow().GetRows()); - dstTable.SetWriteBytes(table.GetUpdateRow().GetBytes()); - dstTable.SetEraseRows(table.GetEraseRow().GetRows()); - dstTable.SetAffectedPartitions(table.GetShardCount()); - - // NOTE: This might be incorrect in case when single shard has several - // tables, i.e. collocated tables. - affectedShards += table.GetShardCount(); - } - - executionExtraStats.SetAffectedShards(affectedShards); - - dstExec.MutableExtra()->PackFrom(executionExtraStats); -} - -} // namespace - -void TKqlTransformContext::AddMkqlStats(const TString& program, NKikimrQueryStats::TTxStats&& txStats) { - Y_UNUSED(program); - - ExtractQueryStats(QueryStats, txStats); -} - -IKqpGateway::TMkqlSettings TKqlTransformContext::GetMkqlSettings(bool hasDataEffects, TInstant now) const { - IKqpGateway::TMkqlSettings mkqlSettings; - mkqlSettings.LlvmRuntime = false; - mkqlSettings.CollectStats = QueryCtx->StatsMode >= EKikimrStatsMode::Basic; - - if (hasDataEffects) { - mkqlSettings.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef(); - } - - auto& deadlines = QueryCtx->Deadlines; - if (deadlines.CancelAt) { - mkqlSettings.CancelAfterMs = now < deadlines.CancelAt ? (deadlines.CancelAt - now).MilliSeconds() : 1; - } - if (deadlines.TimeoutAt) { - mkqlSettings.TimeoutMs = now < deadlines.TimeoutAt ? (deadlines.TimeoutAt - now).MilliSeconds() : 1; - } - - mkqlSettings.Limits = QueryCtx->Limits.PhaseLimits; - - return mkqlSettings; -} - -TMkqlExecuteResult ExecuteMkql(TKiProgram program, TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TExprContext& ctx, TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, - bool hasDataEffects) -{ - auto mkqlProgram = TranslateToMkql(program, ctx, TString(ReadTargetParamName)); - if (!mkqlProgram) { - return TMkqlExecuteResult(MakeFuture(ResultFromError<IKqpGateway::TMkqlResult>( - "Mkql translation failed.", ctx.GetPosition(program.Pos())))); - } - - auto mkqlProgramText = NCommon::SerializeExpr(ctx, mkqlProgram.Cast().Ref()); - TFuture<IKqpGateway::TMkqlResult> future; - - auto paramBindings = CollectParams(mkqlProgram.Cast()); - - if (transformCtx->QueryCtx->PrepareOnly) { - YQL_CLOG(INFO, ProviderKqp) << "Preparing MiniKQL program:" << Endl << mkqlProgramText; - - auto& mkql = *transformCtx->GetPreparingKql().AddMkqls(); - for (auto& binding : paramBindings) { - mkql.AddBindings()->CopyFrom(binding); - } - - mkql.SetIsPure(!hasDataEffects && IsKqlPureExpr(program)); - - future = gateway->PrepareMkql(cluster, mkqlProgramText); - } else { - YQL_CLOG(INFO, ProviderKqp) << "Executing MiniKQL program:" << Endl << mkqlProgramText; - - bool acquireLocks = *txState->Tx().EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE - && !txState->Tx().Locks.Broken(); - auto execParams = BuildParamsMap(paramBindings, txState, transformCtx, acquireLocks); - - if (YQL_CLOG_ACTIVE(TRACE, ProviderKqp)) { - TStringBuilder paramsTextBuilder; - for (auto& pair : execParams.Values) { - TString paramText; - NProtoBuf::TextFormat::PrintToString(pair.second.GetValue(), ¶mText); - paramsTextBuilder << pair.first << ": " << paramText << Endl; - } - - YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL parameters:" << Endl << paramsTextBuilder; - } - - future = gateway->ExecuteMkql(cluster, mkqlProgramText, std::move(execParams), - transformCtx->GetMkqlSettings(hasDataEffects, gateway->GetCurrentTime()), txState->Tx().GetSnapshot()); - } - - return TMkqlExecuteResult(mkqlProgramText, future); -} - -bool AddDeferredEffect(NNodes::TExprBase effect, const TVector<NKikimrKqp::TParameterBinding>& bindings, - TExprContext& ctx, TKqpTransactionState& txState, TKqlTransformContext& transformCtx, bool preserveParamValues) -{ - if (transformCtx.QueryCtx->PrepareOnly) { - auto& newEffect = *transformCtx.GetPreparingKql().AddEffects(); - newEffect.SetNodeAst(NCommon::SerializeExpr(ctx, effect.Ref())); - for (auto& binding : bindings) { - newEffect.AddBindings()->CopyFrom(binding); - } - } else { - if (txState.Tx().Locks.Broken()) { - txState.Tx().Locks.ReportIssues(ctx); - return false; - } - THashMap<TString, NKikimrKqp::TParameterBinding> bindingsMap; - for (auto& binding : bindings) { - bindingsMap.emplace(binding.GetName(), binding); - } - - if (!ProcessEffect(effect, bindingsMap, transformCtx, ctx)) { - return false; - } - - effect = PreserveParams(effect, bindingsMap, ctx, txState, transformCtx, preserveParamValues); - } - - bool added = txState.Tx().AddDeferredEffect(effect); - YQL_ENSURE(added, "Cannot execute new- and old- execution engine queries in the same transaction"); - - YQL_CLOG(INFO, ProviderKqp) << "Adding deferred effect, total " << txState.Tx().DeferredEffects.Size() << ": " - << Endl << KqpExprToPrettyString(effect, ctx); - return true; -} - -bool AddDeferredEffect(NNodes::TExprBase effect, TExprContext& ctx, TKqpTransactionState& txState, - TKqlTransformContext& transformCtx, bool preserveParamValues) -{ - return AddDeferredEffect(effect, CollectParams(effect), ctx, txState, transformCtx, preserveParamValues); -} - -TKqpParamsMap BuildParamsMap(const TVector<NKikimrKqp::TParameterBinding>& bindings, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool acquireLocks) -{ - TKqpParamsMap paramsMap(std::make_shared<TTransformState>(txState, transformCtx)); - - for (auto& binding : bindings) { - auto name = binding.GetName(); - - TMaybe<NDq::TMkqlValueRef> paramRef; - if (binding.GetName() == LocksAcquireParamName) { - auto& param = txState->Tx().ParamsState->Values[LocksAcquireParamName]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id); - param.MutableValue()->SetBool(acquireLocks); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == LocksTxIdParamName) { - auto& param = txState->Tx().ParamsState->Values[LocksTxIdParamName]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui64>::Id); - param.MutableValue()->SetUint64(txState->Tx().Locks.GetLockTxId()); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == ReadTargetParamName) { - YQL_ENSURE(txState->Tx().EffectiveIsolationLevel); - - ui32 readTarget = (ui32)NKikimr::TReadTarget::EMode::Online; - switch (*txState->Tx().EffectiveIsolationLevel) { - case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED: - readTarget = (ui32)NKikimr::TReadTarget::EMode::Head; - break; - case NKikimrKqp::ISOLATION_LEVEL_READ_STALE: - readTarget = (ui32)NKikimr::TReadTarget::EMode::Follower; - break; - default: - break; - } - - auto& param = txState->Tx().ParamsState->Values[ReadTargetParamName]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui32>::Id); - param.MutableValue()->SetUint32(readTarget); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == NowParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui64>::Id); - param.MutableValue()->SetUint64(transformCtx->QueryCtx->GetCachedNow()); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == CurrentDateParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TDate>::Id); - ui64 date = transformCtx->QueryCtx->GetCachedDate(); - YQL_ENSURE(date <= Max<ui32>()); - param.MutableValue()->SetUint32(static_cast<ui32>(date)); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == CurrentDatetimeParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TDatetime>::Id); - ui64 datetime = transformCtx->QueryCtx->GetCachedDatetime(); - YQL_ENSURE(datetime <= Max<ui32>()); - param.MutableValue()->SetUint32(static_cast<ui32>(datetime)); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == CurrentTimestampParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TTimestamp>::Id); - param.MutableValue()->SetUint64(transformCtx->QueryCtx->GetCachedTimestamp()); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == RandomNumberParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui64>::Id); - param.MutableValue()->SetUint64(transformCtx->QueryCtx->GetCachedRandom<ui64>()); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == RandomParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<double>::Id); - param.MutableValue()->SetDouble(transformCtx->QueryCtx->GetCachedRandom<double>()); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.GetName() == RandomUuidParamName) { - auto& param = txState->Tx().ParamsState->Values[binding.GetName()]; - param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TUuid>::Id); - auto uuid = transformCtx->QueryCtx->GetCachedRandom<TGUID>(); - param.MutableValue()->SetBytes(uuid.dw, sizeof(TGUID)); - paramRef = NDq::TMkqlValueRef(param); - } else if (binding.HasMkqlIndex()) { - paramRef = GetParamFromResult(binding, *transformCtx); - } else { - auto clientParam = transformCtx->QueryCtx->Parameters.FindPtr(binding.GetName()); - if (clientParam) { - paramRef = NDq::TMkqlValueRef(*clientParam); - } else { - auto paramValue = txState->Tx().ParamsState->Values.FindPtr(binding.GetName()); - YQL_ENSURE(paramValue, "Parameter not found: " << binding.GetName()); - - paramRef = NDq::TMkqlValueRef(*paramValue); - } - } - - YQL_ENSURE(paramRef); - auto result = paramsMap.Values.emplace(std::make_pair(name, *paramRef)); - YQL_ENSURE(result.second); - } - - return paramsMap; -} - TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) { TStringBuilder message; message << "Transaction locks invalidated."; @@ -791,54 +126,5 @@ bool UnpackMergeLocks(const NKikimrMiniKQL::TResult& result, TKqpTransactionCont return false; } -void LogMkqlResult(const NKikimrMiniKQL::TResult& result, TExprContext& ctx) { - Y_UNUSED(ctx); - - if (YQL_CLOG_ACTIVE(TRACE, ProviderKqp)) { - TString resultType; - TString resultValue; - - NProtoBuf::TextFormat::PrintToString(result.GetType(), &resultType); - NProtoBuf::TextFormat::PrintToString(result.GetValue(), &resultValue); - - YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL results\n" - << "Type:\n" << resultType - << "Value:\n" << resultValue; - } -} - -bool HasEffects(const TKiProgram& program) { - return !program.Effects().Maybe<TCoList>(); -} - -bool HasResults(const TKiProgram& program) { - return !program.Results().Empty(); -} - -TVector<NKikimrKqp::TParameterBinding> CollectParams(TExprBase query) { - TSet<TStringBuf> parametersSet; - TVector<NKikimrKqp::TParameterBinding> bindings; - - VisitExpr(query.Ptr(), [&bindings, ¶metersSet] (const TExprNode::TPtr& node) { - if (auto maybeParam = TMaybeNode<TCoParameter>(node)) { - auto param = maybeParam.Cast(); - auto result = parametersSet.insert(param.Name()); - if (result.second) { - bindings.push_back(GetParameterBinding(param)); - } - } - - return true; - }); - - return bindings; -} - -TAutoPtr<IGraphTransformer> CreateKqpExecTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) -{ - return new TKqpExecTransformer(gateway, cluster, txState, transformCtx); -} - } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_query_finalize.cpp b/ydb/core/kqp/prepare/kqp_query_finalize.cpp deleted file mode 100644 index 5e442780f7a..00000000000 --- a/ydb/core/kqp/prepare/kqp_query_finalize.cpp +++ /dev/null @@ -1,638 +0,0 @@ -#include "kqp_prepare_impl.h" - -#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> -#include <ydb/core/tx/datashard/sys_tables.h> - -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/core/issue/yql_issue.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NCommon; -using namespace NYql::NNodes; -using namespace NThreading; - -namespace { - -const TStringBuf LocksInvalidatedResultName = "tx_locks_invalidated"; -const TStringBuf LocksInvalidatedListName = "tx_locks_invalidated_list"; -const TStringBuf LocksTableName = "/sys/locks2"; -const TStringBuf LocksTableVersion = "0"; -const TString LocksTablePathId = TKikimrPathId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks2).ToString(); -const ui64 LocksInvalidatedCount = 1; - -TExprBase GetDeferredEffectsList(const TDeferredEffects& effects, TPositionHandle pos, TExprContext& ctx) { - if (effects.Empty()) { - return GetEmptyEffectsList(pos, ctx); - } - - TVector<TExprBase> effectNodes; - effectNodes.reserve(effects.Size()); - for (const auto& effect : effects) { - YQL_ENSURE(effect.Params.empty()); - YQL_ENSURE(effect.Node); - effectNodes.push_back(effect.Node.Cast()); - } - - return Build<TCoExtend>(ctx, pos) - .Add(effectNodes) - .Done(); -} - -TExprBase GetEraseLocksEffects(const TString& cluster, TPositionHandle pos, TCoParameter locksList, TExprContext& ctx) { - return Build<TKiMapParameter>(ctx, pos) - .Input(locksList) - .Lambda() - .Args({"lockItem"}) - .Body<TKiEraseRow>() - .Cluster().Build(cluster) - .Table<TKiVersionedTable>() - .Path<TCoAtom>().Build(LocksTableName) - .SchemaVersion<TCoAtom>().Build(LocksTableVersion) - .PathId<TCoAtom>().Build(LocksTablePathId) - .Build() - .Key() - .Add() - .Name().Build("LockId") - .Value<TCoMember>() - .Struct("lockItem") - .Name().Build("LockId") - .Build() - .Build() - .Add() - .Name().Build("DataShard") - .Value<TCoMember>() - .Struct("lockItem") - .Name().Build("DataShard") - .Build() - .Build() - .Add() - .Name().Build("SchemeShard") - .Value<TCoMember>() - .Struct("lockItem") - .Name().Build("SchemeShard") - .Build() - .Build() - .Add() - .Name().Build("PathId") - .Value<TCoMember>() - .Struct("lockItem") - .Name().Build("PathId") - .Build() - .Build() - .Build() - .Build() - .Build() - .Done(); -} - -const TTypeAnnotationNode* GetTxLockListType(TExprContext& ctx) { - auto ui32Type = ctx.MakeType<TDataExprType>(EDataSlot::Uint32); - auto ui64Type = ctx.MakeType<TDataExprType>(EDataSlot::Uint64); - TVector<const TItemExprType*> lockItems; - lockItems.reserve(6); - lockItems.push_back(ctx.MakeType<TItemExprType>("LockId", ui64Type)); - lockItems.push_back(ctx.MakeType<TItemExprType>("DataShard", ui64Type)); - lockItems.push_back(ctx.MakeType<TItemExprType>("SchemeShard", ui64Type)); - lockItems.push_back(ctx.MakeType<TItemExprType>("PathId", ui64Type)); - lockItems.push_back(ctx.MakeType<TItemExprType>("Generation", ui32Type)); - lockItems.push_back(ctx.MakeType<TItemExprType>("Counter", ui64Type)); - auto lockType = ctx.MakeType<TStructExprType>(lockItems); - auto lockListType = ctx.MakeType<TListExprType>(lockType); - return lockListType; -} - -class TKqpFinalizeTransformer : public TGraphTransformerBase { -public: - TKqpFinalizeTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) - : Gateway(gateway) - , Cluster(cluster) - , TxState(txState) - , TransformCtx(transformCtx) - , HasProgramResults(false) {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - YQL_ENSURE(State == EFinalizeState::Initial); - - auto resultsNode = GetResultsNode(TExprBase(input), ctx); - TExprBase effectsNode = GetEmptyEffectsList(input->Pos(), ctx); - - auto settings = TransformCtx->Settings; - - if (settings.GetRollbackTx()) { - YQL_ENSURE(!settings.GetCommitTx()); - - YQL_CLOG(INFO, ProviderKqp) << "Rollback Tx" - << ", deferred effects count: " << TxState->Tx().DeferredEffects.Size() - << ", locks count: " << TxState->Tx().Locks.Size(); - - effectsNode = GetRollbackEffects(input->Pos(), ctx); - State = EFinalizeState::RollbackInProgress; - } - - bool hasDataEffects = false; - if (settings.GetCommitTx()) { - YQL_ENSURE(!settings.GetRollbackTx()); - - effectsNode = GetCommitEffects(input->Pos(), ctx, hasDataEffects); - - { - if (!CheckCommitEffects(effectsNode, ctx)) { - return RollbackOnError(ctx); - } - - if (TxState->Tx().IsInvalidated()) { - ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::KIKIMR_OPERATION_ABORTED, TStringBuilder() - << "Failed to commit transaction due to previous errors.")); - return RollbackOnError(ctx); - } - } - - YQL_CLOG(INFO, ProviderKqp) << "Commit Tx" - << ", deferred effects count: " << TxState->Tx().DeferredEffects.Size() - << ", locks count: " << TxState->Tx().Locks.Size(); - - State = EFinalizeState::CommitInProgress; - } - - auto program = Build<TKiProgram>(ctx, input->Pos()) - .Results(resultsNode) - .Effects(effectsNode) - .Done(); - - HasProgramResults = HasResults(program); - if (!HasProgramResults && !HasEffects(program)) { - if (State != EFinalizeState::Initial) { - ResetTxState(State == EFinalizeState::CommitInProgress); - } - - State = EFinalizeState::Initial; - return TStatus::Ok; - } - - if (TransformCtx->QueryCtx->PrepareOnly) { - YQL_ENSURE(!HasProgramResults); - State = EFinalizeState::Initial; - return TStatus::Ok; - } - - return ExecuteProgram(program, ctx, hasDataEffects, ShouldWaitForResults(hasDataEffects)); - } - - TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { - Y_UNUSED(input); - return Promise.GetFuture(); - } - - TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - output = input; - - NKikimr::NKqp::IKqpGateway::TMkqlResult result(MkqlExecuteResult.Future.ExtractValue()); - bool success = result.Success(); - - if (success) { - LogMkqlResult(result.Result, ctx); - TransformCtx->AddMkqlStats(MkqlExecuteResult.Program, std::move(result.TxStats)); - } - - switch (State) { - case EFinalizeState::CommitInProgress: { - TMaybe<TKqpTxLock> invalidatedLock; - bool locksOk = success - ? CheckInvalidatedLocks(result.Result, invalidatedLock) - : true; - - success = success && locksOk; - - if (!success) { - result.ReportIssues(ctx.IssueManager); - - if (!locksOk) { - ctx.AddError(GetLocksInvalidatedIssue(TxState->Tx(), invalidatedLock)); - } - } else { - result.ReportIssues(ctx.IssueManager); - - auto mkqlResult = MakeSimpleShared<NKikimrMiniKQL::TResult>(); - mkqlResult->Swap(&result.Result); - - if (HasProgramResults) { - TransformCtx->MkqlResults.push_back(mkqlResult); - } - } - - break; - } - - case EFinalizeState::RollbackInProgress: { - success = true; - break; - } - - case EFinalizeState::RollbackOnErrorInProgress: { - success = false; - break; - } - - default: - YQL_ENSURE(false, "Unexpected state in finalize transformer."); - break; - } - - ResetTxState(State == EFinalizeState::CommitInProgress); - State = EFinalizeState::Initial; - - return success ? TStatus::Ok : TStatus::Error; - } - - void Rewind() override { - State = EFinalizeState::Initial; - } - -private: - enum class EFinalizeState { - Initial, - CommitInProgress, - RollbackInProgress, - RollbackOnErrorInProgress - }; - - TStatus ExecuteProgram(TKiProgram program, TExprContext& ctx, bool hasDataEffects, bool waitForResults) { - if (waitForResults) { - Promise = NewPromise(); - MkqlExecuteResult = ExecuteMkql(program, Gateway, Cluster, ctx, TxState, TransformCtx, hasDataEffects); - - auto promise = Promise; - MkqlExecuteResult.Future.Apply([promise](const TFuture<IKqpGateway::TMkqlResult> future) mutable { - YQL_ENSURE(future.HasValue()); - promise.SetValue(); - }); - - return TStatus::Async; - } - - YQL_ENSURE(!hasDataEffects); - - ExecuteMkql(program, Gateway, Cluster, ctx, TxState, TransformCtx, false); - - ResetTxState(true); - State = EFinalizeState::Initial; - - return TStatus::Ok; - } - - TStatus RollbackOnError(TExprContext& ctx) { - YQL_ENSURE(State == EFinalizeState::Initial); - - YQL_CLOG(INFO, ProviderKqp) << "Rollback Tx On Error" - << ", deferred effects count: " << TxState->Tx().DeferredEffects.Size() - << ", locks count: " << TxState->Tx().Locks.Size(); - - auto program = Build<TKiProgram>(ctx, TPositionHandle()) - .Results() - .Build() - .Effects(GetRollbackEffects(TPositionHandle(), ctx)) - .Done(); - - if (!HasEffects(program)) { - ResetTxState(false); - - return TStatus::Error; - } - - State = EFinalizeState::RollbackOnErrorInProgress; - return ExecuteProgram(program, ctx, false, true); - } - - TExprList GetResultsNode(TExprBase program, TExprContext& ctx) { - TMaybeNode<TExprList> resultsNode; - - if (program.Maybe<TCoWorld>()) { - resultsNode = Build<TExprList>(ctx, program.Pos()).Done(); - } else { - const auto& analyzeResults = TransformCtx->AnalyzeResults; - YQL_ENSURE(analyzeResults.CanExecute); - - resultsNode = program.Cast<TKiProgram>().Results(); - } - - return resultsNode.Cast(); - } - - bool CheckCommitEffects(TExprBase effects, TExprContext& ctx) const { - TIssueScopeGuard issueScope(ctx.IssueManager, [effects, &ctx]() { - return MakeIntrusive<TIssue>(YqlIssue(ctx.GetPosition(effects.Pos()), TIssuesIds::DEFAULT_ERROR, - "Failed to commit transaction")); - }); - - TMaybeNode<TExprBase> blackistedNode; - ui32 readsCount = 0; - VisitExpr(effects.Ptr(), [&blackistedNode, &readsCount](const TExprNode::TPtr& exprNode) { - if (blackistedNode) { - return false; - } - - if (auto maybeCallable = TMaybeNode<TCallable>(exprNode)) { - auto callable = maybeCallable.Cast(); - - if (callable.CallableName() == "Udf" || - callable.Maybe<TKiSelectRange>()) - { - blackistedNode = callable; - return false; - } - - if (callable.Maybe<TKiSelectRow>()) { - ++readsCount; - } - } - - return true; - }); - - if (blackistedNode) { - ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder() - << "Callable not expected in commit tx: " << blackistedNode.Cast<TCallable>().CallableName())); - return false; - } - - ui32 maxReadsCount = TransformCtx->Config->_CommitReadsLimit.Get().GetRef(); - if (readsCount > maxReadsCount) { - ctx.AddError(TIssue(ctx.GetPosition(effects.Pos()), TStringBuilder() - << "Reads limit exceeded in commit tx: " << readsCount << " > " << maxReadsCount)); - return false; - } - - return true; - } - - static bool CheckInvalidatedLocks(const NKikimrMiniKQL::TResult& result, TMaybe<TKqpTxLock>& invalidatedLock) { - auto structType = result.GetType().GetStruct(); - - ui32 resultIndex; - if (!GetRunResultIndex(structType, TString(LocksInvalidatedResultName), resultIndex)) { - return true; - } - - auto locksResult = result.GetValue().GetStruct(resultIndex); - if (locksResult.HasOptional()) { - bool invalidated = locksResult.GetOptional().GetBool(); - YQL_ENSURE(invalidated); - - ui32 listIndex; - if (GetRunResultIndex(structType, TString(LocksInvalidatedListName), listIndex)) { - auto& locksResult = result.GetValue().GetStruct(listIndex); - auto& list = locksResult.GetOptional().GetList(); - if (!list.empty()) { - invalidatedLock = TKqpTxLock(list.Get(0)); - } - } - - return false; - } - - return true; - } - - void ResetTxState(bool committed) { - if (!committed) { - TxState->Tx().Invalidate(); - } - - TxState->Tx().ClearDeferredEffects(); - TxState->Tx().Locks.Clear(); - TxState->Tx().Finish(); - } - - TExprBase GetRollbackEffects(TPositionHandle pos, TExprContext& ctx) { - if (!TxState->Tx().Locks.HasLocks()) { - return GetEmptyEffectsList(pos, ctx); - } - - auto locksParamName = TxState->Tx().NewParamName(); - YQL_ENSURE(TxState->Tx().ParamsState->Values.emplace(std::make_pair(locksParamName, - GetLocksParamValue(TxState->Tx().Locks))).second); - - auto locksParamNode = Build<TCoParameter>(ctx, pos) - .Name().Build(locksParamName) - .Type(ExpandType(pos, *GetTxLockListType(ctx), ctx)) - .Done(); - - return GetEraseLocksEffects(Cluster, pos, locksParamNode, ctx); - } - - TExprBase GetCommitEffects(TPositionHandle pos, TExprContext& ctx, bool& hasDataEffects) { - hasDataEffects = !TxState->Tx().DeferredEffects.Empty(); - - Y_VERIFY_DEBUG(!hasDataEffects || !TxState->Tx().Locks.Broken()); - - auto deferredEffects = GetDeferredEffectsList(TxState->Tx().DeferredEffects, pos, ctx); - - if (!TxState->Tx().Locks.HasLocks()) { - return deferredEffects; - } - - auto locksParamName = TxState->Tx().NewParamName(); - YQL_ENSURE(TxState->Tx().ParamsState->Values.emplace(std::make_pair(locksParamName, - GetLocksParamValue(TxState->Tx().Locks))).second); - - auto locksParamNode = Build<TCoParameter>(ctx, pos) - .Name().Build(locksParamName) - .Type(ExpandType(pos, *GetTxLockListType(ctx), ctx)) - .Done(); - - if (!hasDataEffects && TxState->Tx().GetSnapshot().IsValid()) - return GetEraseLocksEffects(Cluster, pos, locksParamNode, ctx); - - auto lockArg = Build<TCoArgument>(ctx, pos) - .Name("lockArg") - .Done(); - - auto selectLock = Build<TKiSelectRow>(ctx, pos) - .Cluster().Build(Cluster) - .Table<TKiVersionedTable>() - .Path<TCoAtom>().Build(LocksTableName) - .SchemaVersion<TCoAtom>().Build(LocksTableVersion) - .PathId<TCoAtom>().Build(LocksTablePathId) - .Build() - .Key() - .Add() - .Name().Build("LockId") - .Value<TCoMember>() - .Struct(lockArg) - .Name().Build("LockId") - .Build() - .Build() - .Add() - .Name().Build("DataShard") - .Value<TCoMember>() - .Struct(lockArg) - .Name().Build("DataShard") - .Build() - .Build() - .Add() - .Name().Build("SchemeShard") - .Value<TCoMember>() - .Struct(lockArg) - .Name().Build("SchemeShard") - .Build() - .Build() - .Add() - .Name().Build("PathId") - .Value<TCoMember>() - .Struct(lockArg) - .Name().Build("PathId") - .Build() - .Build() - .Build() - .Select() - .Add().Build("Generation") - .Add().Build("Counter") - .Build() - .Done(); - - TVector<TExprBase> args = { - Build<TCoCmpEqual>(ctx, pos) - .Left<TCoMember>() - .Struct(selectLock) - .Name().Build("Generation") - .Build() - .Right<TCoMember>() - .Struct(lockArg) - .Name().Build("Generation") - .Build() - .Done(), - Build<TCoCmpEqual>(ctx, pos) - .Left<TCoMember>() - .Struct(selectLock) - .Name().Build("Counter") - .Build() - .Right<TCoMember>() - .Struct(lockArg) - .Name().Build("Counter") - .Build() - .Done() - }; - auto lockPredicate = Build<TCoNot>(ctx, pos) - .Value<TCoCoalesce>() - .Predicate<TCoAnd>() - .Add(args) - .Build() - .Value<TCoBool>() - .Literal().Build("false") - .Build() - .Build() - .Done(); - - auto locksInvalidatedList = Build<TKiFlatMapParameter>(ctx, pos) - .Input(locksParamNode) - .Lambda() - .Args(lockArg) - .Body<TCoListIf>() - .Predicate(lockPredicate) - .Value(lockArg) - .Build() - .Build() - .Done(); - - auto locksInvalidatedPredicate = Build<TCoHasItems>(ctx, pos) - .List(locksInvalidatedList) - .Done(); - - auto effects = Build<TCoExtend>(ctx, pos) - .Add<TCoIf>() - .Predicate(locksInvalidatedPredicate) - .ThenValue<TCoAsList>() - .Add<TKiSetResult>() - .Name().Build(LocksInvalidatedResultName) - .Data(locksInvalidatedPredicate) - .Build() - .Add<TKiSetResult>() - .Name().Build(LocksInvalidatedListName) - .Data<TCoTake>() - .Input(locksInvalidatedList) - .Count<TCoUint64>() - .Literal().Build(ToString(LocksInvalidatedCount)) - .Build() - .Build() - .Build() - .Build() - .ElseValue(deferredEffects) - .Build() - .Add(GetEraseLocksEffects(Cluster, pos, locksParamNode, ctx)) - .Done(); - - return effects; - } - - bool ShouldWaitForResults(bool hasDataEffects) { - if (State != EFinalizeState::CommitInProgress) { - return true; - } - - if (hasDataEffects) { - return true; - } - - if (!TxState->Tx().GetSnapshot().IsValid()) { - return true; - } - - if (HasProgramResults) { - return true; - } - - return false; - } - -private: - TIntrusivePtr<IKqpGateway> Gateway; - TString Cluster; - TIntrusivePtr<TKqpTransactionState> TxState; - TIntrusivePtr<TKqlTransformContext> TransformCtx; - EFinalizeState State; - bool HasProgramResults; - TMkqlExecuteResult MkqlExecuteResult; - TPromise<void> Promise; -}; - -} // namespace - -TCoList GetEmptyEffectsList(const TPositionHandle pos, TExprContext& ctx) { - return Build<TCoList>(ctx, pos) - .ListType<TCoListType>() - .ItemType<TCoVoidType>() - .Build() - .Build() - .Done(); -} - -NKikimrMiniKQL::TParams GetLocksParamValue(const TKqpTxLocks& locks) { - YQL_ENSURE(locks.HasLocks()); - - NKikimrMiniKQL::TParams locksResult; - auto type = locksResult.MutableType(); - type->SetKind(NKikimrMiniKQL::ETypeKind::List); - type->MutableList()->CopyFrom(locks.LocksListType); - auto value = locksResult.MutableValue(); - for (auto& pair : locks.LocksMap) { - auto lockValue = value->AddList(); - lockValue->CopyFrom(pair.second.GetValue()); - } - - return locksResult; -} - -TAutoPtr<IGraphTransformer> CreateKqpFinalizeTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, - TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) -{ - return new TKqpFinalizeTransformer(gateway, cluster, txState, transformCtx); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp index cfbd16d0f99..068f352dbfb 100644 --- a/ydb/core/kqp/prepare/kqp_query_plan.cpp +++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp @@ -1,4 +1,6 @@ -#include "kqp_prepare_impl.h" +#include "kqp_prepare.h" + +#include "kqp_query_plan.h" #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> @@ -30,7 +32,7 @@ using namespace NClient; namespace { struct TTableRead { - ETableReadType Type = ETableReadType::Unspecified; + EPlanTableReadType Type = EPlanTableReadType::Unspecified; TVector<TString> LookupBy; TVector<TString> ScanBy; TVector<TString> Columns; @@ -39,7 +41,7 @@ struct TTableRead { }; struct TTableWrite { - ETableWriteType Type = ETableWriteType::Unspecified; + EPlanTableWriteType Type = EPlanTableWriteType::Unspecified; TVector<TString> Keys; TVector<TString> Columns; }; @@ -49,6 +51,17 @@ struct TTableInfo { TVector<TTableWrite> Writes; }; +struct TExprScope { + NYql::NNodes::TCallable Callable; + NYql::NNodes::TCoLambda Lambda; + ui32 Depth; + + TExprScope(NYql::NNodes::TCallable callable, NYql::NNodes::TCoLambda Lambda, ui32 depth) + : Callable(callable) + , Lambda(Lambda) + , Depth(depth) {} +}; + struct TSerializerCtx { TSerializerCtx(TExprContext& exprCtx, const TString& cluster, const TIntrusivePtr<NYql::TKikimrTablesData> tablesData, @@ -211,12 +224,12 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab if (read.LookupBy.empty()) { read.Type = TKikimrKeyRange::IsFull(selectRange.Range()) - ? ETableReadType::FullScan - : ETableReadType::Scan; + ? EPlanTableReadType::FullScan + : EPlanTableReadType::Scan; } else { read.Type = mapDepth > 0 - ? ETableReadType::MultiLookup - : ETableReadType::Lookup; + ? EPlanTableReadType::MultiLookup + : EPlanTableReadType::Lookup; } tables[TString(selectRange.Table().Path())].Reads.push_back(read); @@ -227,8 +240,8 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab TTableRead read; read.Type = mapDepth > 0 - ? ETableReadType::MultiLookup - : ETableReadType::Lookup; + ? EPlanTableReadType::MultiLookup + : EPlanTableReadType::Lookup; for (const auto& key : selectRow.Key()) { auto lookup = TStringBuilder() << key.Name().Value() @@ -248,8 +261,8 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab TTableWrite write; write.Type = mapDepth > 0 - ? ETableWriteType::MultiUpsert - : ETableWriteType::Upsert; + ? EPlanTableWriteType::MultiUpsert + : EPlanTableWriteType::Upsert; for (const auto& tuple : updateRow.Key()) { @@ -270,8 +283,8 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab TTableWrite write; write.Type = mapDepth > 0 - ? ETableWriteType::MultiErase - : ETableWriteType::Erase; + ? EPlanTableWriteType::MultiErase + : EPlanTableWriteType::Erase; for (const auto& tuple : eraseRow.Key()) { auto key = TStringBuilder() << tuple.Name().Value() @@ -683,7 +696,7 @@ private: auto tableLookup = maybeTableLookup.Cast(); TTableRead readInfo; - readInfo.Type = ETableReadType::Lookup; + readInfo.Type = EPlanTableReadType::Lookup; planNode.TypeName = "TableLookup"; TString table(tableLookup.Table().Path().Value()); auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table); @@ -935,7 +948,7 @@ private: op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table; TTableWrite writeInfo; - writeInfo.Type = ETableWriteType::MultiUpsert; + writeInfo.Type = EPlanTableWriteType::MultiUpsert; for (const auto& column : upsert.Columns()) { writeInfo.Columns.push_back(TString(column.Value())); } @@ -954,7 +967,7 @@ private: op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table; TTableWrite writeInfo; - writeInfo.Type = ETableWriteType::MultiErase; + writeInfo.Type = EPlanTableWriteType::MultiErase; SerializerCtx.Tables[table].Writes.push_back(writeInfo); planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]); @@ -1029,7 +1042,7 @@ private: ui32 Visit(const TKqlLookupTableBase& lookup, TQueryPlanNode& planNode) { auto table = TString(lookup.Table().Path().Value()); TTableRead readInfo; - readInfo.Type = ETableReadType::Lookup; + readInfo.Type = EPlanTableReadType::Lookup; TOperator op; op.Properties["Name"] = "TablePointLookup"; @@ -1059,7 +1072,7 @@ private: auto rangesDesc = PrettyExprStr(read.Ranges()); if (rangesDesc == "Void" || explainPrompt.UsedKeyColumns.empty()) { - readInfo.Type = ETableReadType::FullScan; + readInfo.Type = EPlanTableReadType::FullScan; auto& ranges = op.Properties["ReadRanges"]; for (const auto& col : tableData.Metadata->KeyColumnNames) { @@ -1069,7 +1082,7 @@ private: ranges.AppendValue(rangeDesc); } } else if (auto maybeResultBinding = ContainResultBinding(rangesDesc)) { - readInfo.Type = ETableReadType::Scan; + readInfo.Type = EPlanTableReadType::Scan; auto [txId, resId] = *maybeResultBinding; if (auto result = GetResult(txId, resId)) { @@ -1143,7 +1156,7 @@ private: } ui32 operatorId; - if (readInfo.Type == ETableReadType::FullScan) { + if (readInfo.Type == EPlanTableReadType::FullScan) { op.Properties["Name"] = "TableFullScan"; operatorId = AddOperator(planNode, "TableFullScan", std::move(op)); } else { @@ -1244,9 +1257,9 @@ private: // Scan which fixes only few first members of compound primary key were called "Lookup" // by older explain version. We continue to do so. if (readInfo.LookupBy.size() > 0) { - readInfo.Type = ETableReadType::Lookup; + readInfo.Type = EPlanTableReadType::Lookup; } else { - readInfo.Type = hasRangeScans ? ETableReadType::Scan : ETableReadType::FullScan; + readInfo.Type = hasRangeScans ? EPlanTableReadType::Scan : EPlanTableReadType::FullScan; } } @@ -1277,13 +1290,13 @@ private: SerializerCtx.Tables[table].Reads.push_back(readInfo); ui32 operatorId; - if (readInfo.Type == ETableReadType::Scan) { + if (readInfo.Type == EPlanTableReadType::Scan) { op.Properties["Name"] = "TableRangeScan"; operatorId = AddOperator(planNode, "TableRangeScan", std::move(op)); - } else if (readInfo.Type == ETableReadType::FullScan) { + } else if (readInfo.Type == EPlanTableReadType::FullScan) { op.Properties["Name"] = "TableFullScan"; operatorId = AddOperator(planNode, "TableFullScan", std::move(op)); - } else if (readInfo.Type == ETableReadType::Lookup) { + } else if (readInfo.Type == EPlanTableReadType::Lookup) { op.Properties["Name"] = "TablePointLookup"; operatorId = AddOperator(planNode, "TablePointLookup", std::move(op)); } else { diff --git a/ydb/core/kqp/prepare/kqp_query_plan.h b/ydb/core/kqp/prepare/kqp_query_plan.h index 71af7c99cc9..828cb5ce5c6 100644 --- a/ydb/core/kqp/prepare/kqp_query_plan.h +++ b/ydb/core/kqp/prepare/kqp_query_plan.h @@ -12,6 +12,22 @@ namespace NKikimr { namespace NKqp { +enum class EPlanTableReadType { + Unspecified, + FullScan, + Scan, + Lookup, + MultiLookup, +}; + +enum class EPlanTableWriteType { + Unspecified, + Upsert, + MultiUpsert, + Erase, + MultiErase, +}; + void WriteKqlPlan(NJsonWriter::TBuf& writer, const NYql::TExprNode::TPtr& query); /* diff --git a/ydb/core/kqp/prepare/kqp_query_rewrite.cpp b/ydb/core/kqp/prepare/kqp_query_rewrite.cpp deleted file mode 100644 index cb24a1b8a1d..00000000000 --- a/ydb/core/kqp/prepare/kqp_query_rewrite.cpp +++ /dev/null @@ -1,398 +0,0 @@ -#include "kqp_prepare_impl.h" - -#include <ydb/library/yql/core/yql_expr_optimize.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; - -namespace { - -template<typename TMapNode> -TExprBase RebuildMapToList(TMapNode map, TExprContext& ctx) { - if (map.Lambda().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List && - map.Input().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) - { - return map; - } - - bool isOptional = map.Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional; - - if (map.Lambda().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) { - auto newBody = Build<TCoToList>(ctx, map.Pos()) - .Optional(map.Lambda().Body()) - .Done(); - - map = Build<TMapNode>(ctx, map.Pos()) - .Input(map.Input()) - .Lambda() - .Args({"item"}) - .template Body<TExprApplier>() - .Apply(newBody) - .With(map.Lambda().Args().Arg(0), "item") - .Build() - .Build() - .Done(); - } - - if (map.Input().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) { - map = Build<TMapNode>(ctx, map.Pos()) - .template Input<TCoToList>() - .Optional(map.Input()) - .Build() - .Lambda(map.Lambda()) - .Done(); - } - - if (isOptional) { - return Build<TCoToOptional>(ctx, map.Pos()) - .List(map) - .Done(); - } - - YQL_CLOG(INFO, ProviderKqp) << "RebuildMapToList"; - return map; -} - -TExprNode::TPtr NormalizeCallables(TExprBase node, TExprContext& ctx, const TKqpAnalyzeResults& analyzeResults) { - if (!analyzeResults.CallableToExecRootsMap.contains(node.Raw())) { - return node.Ptr(); - } - - if (node.Maybe<TCoMap>() || node.Maybe<TCoFlatMap>()) { - return node.Maybe<TCoMap>() - ? RebuildMapToList<TCoMap>(node.Cast<TCoMap>(), ctx).Ptr() - : RebuildMapToList<TCoFlatMap>(node.Cast<TCoFlatMap>(), ctx).Ptr(); - } - - if (auto filter = node.Maybe<TCoFilter>()) { - YQL_CLOG(INFO, ProviderKqp) << "NormalizeCallables: Filter"; - return Build<TCoFlatMap>(ctx, node.Pos()) - .Input(filter.Cast().Input()) - .Lambda() - .Args({"item"}) - .Body<TCoIf>() - .Predicate<TExprApplier>() - .Apply(filter.Cast().Lambda()) - .With(0, "item") - .Build() - .ThenValue<TCoJust>() - .Input("item") - .Build() - .ElseValue<TCoNothing>() - .OptionalType<TCoOptionalType>() - .ItemType<TCoTypeOf>() - .Value("item") - .Build() - .Build() - .Build() - .Build() - .Build() - .Done() - .Ptr(); - } - - return node.Ptr(); -} - -TExprNode::TPtr ToListOverToOptional(TExprBase node, TExprContext& ctx) { - Y_UNUSED(ctx); - - if (auto toList = node.Maybe<TCoToList>()) { - if (auto toOpt = toList.Cast().Optional().Maybe<TCoToOptional>()) { - YQL_CLOG(INFO, ProviderKqp) << "ToListOverToOptional"; - return toOpt.Cast().List().Ptr(); - } - if (auto toOpt = toList.Cast().Optional().Maybe<TCoHead>()) { - YQL_CLOG(INFO, ProviderKqp) << "ToListOverHead"; - return toOpt.Cast().Input().Ptr(); - } - if (auto toOpt = toList.Cast().Optional().Maybe<TCoLast>()) { - YQL_CLOG(INFO, ProviderKqp) << "ToListOverLast"; - return toOpt.Cast().Input().Ptr(); - } - } - - return node.Ptr(); -} - -template<typename TInner, typename TOuter> -TExprBase SplitMap(TExprBase input, TCoLambda lambda, TExprContext& ctx, const TVector<TExprBase> execRoots, - const TKqpAnalyzeResults& analyzeResults) -{ - auto exprRootsTuple = Build<TExprList>(ctx, lambda.Pos()) - .Add(execRoots) - .Done(); - - auto isSameScope = [lambda] (const TNodeInfo* nodeInfo) { - if (!nodeInfo) { - return true; - } - - if (!nodeInfo->Scope || nodeInfo->Scope->Lambda.Raw() != lambda.Raw()) { - return false; - } - - return true; - }; - - THashSet<const TExprNode*> innerNodes; - innerNodes.insert(lambda.Args().Arg(0).Raw()); - VisitExpr(exprRootsTuple.Ptr(), - [&innerNodes, &isSameScope, &analyzeResults] (const TExprNode::TPtr& node) { - auto* nodeInfo = analyzeResults.ExprToNodeInfoMap.FindPtr(node.Get()); - - if (!isSameScope(nodeInfo)) { - return false; - } - - if (node->IsCallable()) { - innerNodes.insert(node.Get()); - } - - return true; - }); - - THashMap<const TExprNode*, TExprBase> jointsMap; - for (TExprBase root : execRoots) { - jointsMap.insert(std::make_pair(root.Raw(), root)); - } - - VisitExpr(lambda.Body().Ptr(), - [&innerNodes, &jointsMap, isSameScope, &analyzeResults] (const TExprNode::TPtr& node) { - auto* nodeInfo = analyzeResults.ExprToNodeInfoMap.FindPtr(node.Get()); - YQL_ENSURE(nodeInfo); - - YQL_ENSURE(node->GetTypeAnn()); - bool isComputable = node->IsComputable(); - - if (innerNodes.contains(node.Get())) { - if (isComputable) { - jointsMap.insert(std::make_pair(node.Get(), TExprBase(node))); - } - - return false; - } - - bool hasInnerChild = false; - if (isComputable && !node->Children().empty() && nodeInfo->IsExecutable) { - for (const auto& child : node->Children()) { - if (jointsMap.contains(child.Get())) { - return true; - } - - bool innerChild = innerNodes.contains(child.Get()); - hasInnerChild = hasInnerChild | innerChild; - - YQL_ENSURE(child->GetTypeAnn()); - if (child->GetTypeAnn()->IsComputable() && !innerChild) { - return true; - } - } - - if (hasInnerChild) { - jointsMap.insert(std::make_pair(node.Get(), TExprBase(node))); - return false; - } - } - - return true; - }); - - TVector<TExprBase> jointNodes; - jointNodes.reserve(jointsMap.size()); - for (auto& pair : jointsMap) { - jointNodes.push_back(pair.second); - } - - auto innerLambdaBody = Build<TExprList>(ctx, lambda.Pos()) - .Add(jointNodes) - .Done(); - - auto innerLambda = Build<TCoLambda>(ctx, lambda.Pos()) - .Args({"item"}) - .Body<TExprApplier>() - .Apply(innerLambdaBody) - .With(lambda.Args().Arg(0), "item") - .Build() - .Done(); - - TCoArgument outerLambdaArg = Build<TCoArgument>(ctx, lambda.Pos()) - .Name("item") - .Done(); - - TNodeOnNodeOwnedMap replaceMap; - for (size_t i = 0; i < jointNodes.size(); ++i) { - auto node = Build<TCoNth>(ctx, lambda.Pos()) - .Tuple(outerLambdaArg) - .Index().Build(i) - .Done(); - - replaceMap.emplace(jointNodes[i].Raw(), node.Ptr()); - } - - auto outerLambdaBody = ctx.ReplaceNodes(lambda.Body().Ptr(), replaceMap); - auto outerLambda = Build<TCoLambda>(ctx, lambda.Pos()) - .Args({outerLambdaArg}) - .Body(TExprBase(outerLambdaBody)) - .Done(); - - return Build<TOuter>(ctx, input.Pos()) - .template Input<TInner>() - .Input(input) - .Lambda(innerLambda) - .Build() - .Lambda(outerLambda) - .Done(); -} - -TExprNode::TPtr SplitMap(TExprBase mapNode, TExprContext& ctx, const TVector<TExprBase>& execRoots, - const TKqpAnalyzeResults& analyzeResults) -{ - YQL_ENSURE(mapNode.Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List); - - if (auto map = mapNode.Maybe<TCoMap>()) { - YQL_CLOG(INFO, ProviderKqp) << "SplitMap: Map, MapParameter"; - return SplitMap<TCoMap, TKiMapParameter>(map.Cast().Input(), map.Cast().Lambda(), ctx, execRoots, - analyzeResults).Ptr(); - } - - if (auto map = mapNode.Maybe<TCoFlatMap>()) { - YQL_CLOG(INFO, ProviderKqp) << "SplitMap: Map, FlatMapParameter"; - YQL_ENSURE(map.Cast().Lambda().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List); - return SplitMap<TCoMap, TKiFlatMapParameter>(map.Cast().Input(), map.Cast().Lambda(), ctx, execRoots, - analyzeResults).Ptr(); - } - - if (auto map = mapNode.Maybe<TKiMapParameter>()) { - YQL_CLOG(INFO, ProviderKqp) << "SplitMap: MapParameter, MapParameter"; - return SplitMap<TKiMapParameter, TKiMapParameter>(map.Cast().Input(), map.Cast().Lambda(), - ctx, execRoots, analyzeResults).Ptr(); - } - - if (auto map = mapNode.Maybe<TKiFlatMapParameter>()) { - YQL_CLOG(INFO, ProviderKqp) << "SplitMap: MapParameter, FlatMapParameter"; - return SplitMap<TKiMapParameter, TKiFlatMapParameter>(map.Cast().Input(), map.Cast().Lambda(), - ctx, execRoots, analyzeResults).Ptr(); - } - - YQL_ENSURE(false); - return nullptr; -} - -TExprNode::TPtr UnnestExecutionRoots(TExprBase node, TExprContext& ctx, const TKqpAnalyzeResults& analyzeResults) { - auto execRoots = analyzeResults.CallableToExecRootsMap.FindPtr(node.Raw()); - - if (!execRoots) { - return node.Ptr(); - } - - if (auto maybeMap = node.Maybe<TCoMap>()) { - auto map = maybeMap.Cast(); - if (map.Input().Maybe<TCoParameter>()) { - YQL_CLOG(INFO, ProviderKqp) << "UnnestExecutionRoots: Map parameter"; - return Build<TKiMapParameter>(ctx, node.Pos()) - .Input(map.Input()) - .Lambda(map.Lambda()) - .Done() - .Ptr(); - } - } - - if (auto maybeFlatMap = node.Maybe<TCoFlatMap>()) { - auto flatMap = maybeFlatMap.Cast(); - if (flatMap.Input().Maybe<TCoParameter>()) { - YQL_CLOG(INFO, ProviderKqp) << "UnnestExecutionRoots: FlatMap parameter"; - return Build<TKiFlatMapParameter>(ctx, node.Pos()) - .Input(flatMap.Input()) - .Lambda(flatMap.Lambda()) - .Done() - .Ptr(); - } - } - - if (node.Maybe<TCoMap>() || - node.Maybe<TCoFlatMap>() || - node.Maybe<TKiMapParameter>() || - node.Maybe<TKiFlatMapParameter>()) - { - return SplitMap(node, ctx, *execRoots, analyzeResults); - } - - ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder() - << "Can't rewrite callable for KQP execution: " << node.Ptr()->Content())); - YQL_ENSURE(false); - return nullptr; -} - -class TKqpRewriteTransformer : public TSyncTransformerBase { -public: - TKqpRewriteTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx) - : TransformCtx(transformCtx) {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - const auto& analyzeResults = TransformCtx->AnalyzeResults; - - if (analyzeResults.CallableToExecRootsMap.empty()) { - return TStatus::Ok; - } - - TOptimizeExprSettings optSettings(nullptr); - optSettings.VisitChanges = true; - TStatus status = TStatus::Error; - - status = OptimizeExpr(input, output, - [&analyzeResults](const TExprNode::TPtr& input, TExprContext& ctx) { - auto ret = input; - TExprBase node(input); - - ret = NormalizeCallables(node, ctx, analyzeResults); - if (ret != input) { - return ret; - } - - ret = ToListOverToOptional(node, ctx); - if (ret != input) { - return ret; - } - - return ret; - }, ctx, optSettings); - YQL_ENSURE(status == TStatus::Ok); - - if (input != output) { - return TStatus(TStatus::Repeat, true); - } - - status = OptimizeExpr(input, output, - [&analyzeResults](const TExprNode::TPtr& input, TExprContext& ctx) { - auto ret = input; - TExprBase node(input); - - ret = UnnestExecutionRoots(node, ctx, analyzeResults); - if (ret != input) { - return ret; - } - - return ret; - }, ctx, optSettings); - YQL_ENSURE(status == TStatus::Ok); - - YQL_ENSURE(input != output); - return TStatus(TStatus::Repeat, true); - } - -private: - TIntrusivePtr<TKqlTransformContext> TransformCtx; -}; - -} // namespace - -TAutoPtr<IGraphTransformer> CreateKqpRewriteTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx) { - return new TKqpRewriteTransformer(transformCtx); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_query_simplify.cpp b/ydb/core/kqp/prepare/kqp_query_simplify.cpp deleted file mode 100644 index d8fefeb9393..00000000000 --- a/ydb/core/kqp/prepare/kqp_query_simplify.cpp +++ /dev/null @@ -1,372 +0,0 @@ -#include "kqp_prepare_impl.h" - -#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> - -#include <ydb/library/yql/core/yql_expr_optimize.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; - -namespace { - -template<typename TMapNode> -TExprBase BuildMap(TExprBase input, TExprBase body, TCoArgument arg, TExprContext& ctx) { - return Build<TMapNode>(ctx, input.Pos()) - .Input(input) - .Lambda() - .Args({"item"}) - .template Body<TExprApplier>() - .Apply(body) - .With(arg, "item") - .Build() - .Build() - .Done(); -} - -TExprNode::TPtr ExtractFilter(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoFlatMap>()) { - return node.Ptr(); - } - - auto flatmap = node.Cast<TCoFlatMap>(); - auto flatmapArg = flatmap.Lambda().Args().Arg(0); - - if (flatmap.Input().Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) { - return node.Ptr(); - } - - if (auto maybeConditional = flatmap.Lambda().Body().Maybe<TCoConditionalValueBase>()) { - auto conditional = maybeConditional.Cast(); - - auto blacklistedNode = FindNode(conditional.Predicate().Ptr(), [](const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - - if (node.Maybe<TKiSelectRow>() || node.Maybe<TKiSelectRangeBase>()) { - return true; - } - - return false; - }); - - if (blacklistedNode) { - return node.Ptr(); - } - - auto filter = Build<TCoFilter>(ctx, node.Pos()) - .Input(flatmap.Input()) - .Lambda() - .Args({"item"}) - .Body<TExprApplier>() - .Apply(conditional.Predicate()) - .With(flatmapArg, "item") - .Build() - .Build() - .Done(); - - auto value = conditional.Value(); - - if (conditional.Maybe<TCoListIf>() || conditional.Maybe<TCoOptionalIf>()) { - return BuildMap<TCoMap>(filter, value, flatmapArg, ctx).Ptr(); - } else { - return BuildMap<TCoFlatMap>(filter, value, flatmapArg, ctx).Ptr(); - } - } - - if (auto maybeConditional = flatmap.Lambda().Body().Maybe<TCoIf>()) { - auto conditional = maybeConditional.Cast(); - - auto elseValue = conditional.ElseValue(); - if (!elseValue.Maybe<TCoList>() && !elseValue.Maybe<TCoNothing>()) { - return node.Ptr(); - } - - auto blacklistedNode = FindNode(conditional.Predicate().Ptr(), [](const TExprNode::TPtr& exprNode) { - auto node = TExprBase(exprNode); - - if (node.Maybe<TKiSelectRow>() || node.Maybe<TKiSelectRangeBase>()) { - return true; - } - - return false; - }); - - if (blacklistedNode) { - return node.Ptr(); - } - - auto filter = Build<TCoFilter>(ctx, node.Pos()) - .Input(flatmap.Input()) - .Lambda() - .Args({"item"}) - .Body<TExprApplier>() - .Apply(conditional.Predicate()) - .With(flatmapArg, "item") - .Build() - .Build() - .Done(); - - auto value = conditional.ThenValue(); - - return BuildMap<TCoFlatMap>(filter, value, flatmapArg, ctx).Ptr(); - } - - return node.Ptr(); -} - -TExprNode::TPtr ExtractCombineByKeyPreMap(TExprBase node, TExprContext& ctx) { - if (auto maybeCombine = node.Maybe<TCoCombineByKey>()) { - auto combine = maybeCombine.Cast(); - if (!IsKqlPureLambda(combine.PreMapLambda())) { - return Build<TCoCombineByKey>(ctx, node.Pos()) - .Input<TCoMap>() - .Input(combine.Input()) - .Lambda(combine.PreMapLambda()) - .Build() - .PreMapLambda() - .Args({"item"}) - .Body("item") - .Build() - .KeySelectorLambda(combine.KeySelectorLambda()) - .InitHandlerLambda(combine.InitHandlerLambda()) - .UpdateHandlerLambda(combine.UpdateHandlerLambda()) - .FinishHandlerLambda(combine.FinishHandlerLambda()) - .Done() - .Ptr(); - } - } - - return node.Ptr(); -} - -template <class TPartitionsType> -TExprNode::TPtr ExtractPartitionByKeyListHandler(TExprBase node, TExprContext& ctx) { - if (auto maybePartition = node.Maybe<TPartitionsType>()) { - auto partition = maybePartition.Cast(); - if (!IsKqlPureLambda(partition.ListHandlerLambda())) { - auto newPartition = Build<TPartitionsType>(ctx, node.Pos()) - .Input(partition.Input()) - .KeySelectorLambda(partition.KeySelectorLambda()) - .SortDirections(partition.SortDirections()) - .SortKeySelectorLambda(partition.SortKeySelectorLambda()) - .ListHandlerLambda() - .Args({"list"}) - .Body("list") - .Build() - .Done(); - - return Build<TExprApplier>(ctx, node.Pos()) - .Apply(partition.ListHandlerLambda().Body()) - .With(partition.ListHandlerLambda().Args().Arg(0), newPartition) - .Done() - .Ptr(); - } - } - - return node.Ptr(); -} - -template<typename TMapType> -TExprNode::TPtr MergeMapsWithSameLambda(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoExtend>()) { - return node.Ptr(); - } - - bool hasInputsToMerge = false; - TVector<std::pair<TMaybeNode<TCoLambda>, TVector<TExprBase>>> inputs; - - auto extend = node.Cast<TCoExtend>(); - for (const auto& list : extend) { - TMaybeNode<TExprBase> input; - TMaybeNode<TCoLambda> lambda; - - bool buildList = false; - if (auto maybeMap = list.Maybe<TMapType>()) { - lambda = maybeMap.Cast().Lambda(); - input = maybeMap.Cast().Input(); - - YQL_ENSURE(input.Ref().GetTypeAnn()); - buildList = input.Cast().Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional; - } else if (auto maybeMap = list.Maybe<TCoToList>().Optional().Maybe<TMapType>()) { - lambda = maybeMap.Cast().Lambda(); - input = maybeMap.Cast().Input(); - buildList = true; - } - - if (buildList) { - input = Build<TCoToList>(ctx, node.Pos()) - .Optional(input.Cast()) - .Done(); - } - - if (lambda && !IsKqlPureLambda(lambda.Cast())) { - if (!inputs.empty() && inputs.back().first && inputs.back().first.Cast().Raw() == lambda.Cast().Raw()) { - inputs.back().second.push_back(input.Cast()); - hasInputsToMerge = true; - } else { - inputs.emplace_back(lambda, TVector<TExprBase>{input.Cast()}); - } - } else { - inputs.emplace_back(TMaybeNode<TCoLambda>(), TVector<TExprBase>{list}); - } - } - - if (!hasInputsToMerge) { - return node.Ptr(); - } - - TVector<TExprBase> merged; - for (const auto& pair : inputs) { - const auto& lambda = pair.first; - const auto& lists = pair.second; - - if (lambda) { - YQL_ENSURE(!lists.empty()); - auto mergedInput = lists.front(); - - if (lists.size() > 1) { - mergedInput = Build<TCoExtend>(ctx, node.Pos()) - .Add(lists) - .Done(); - } - - auto mergedMap = Build<TMapType>(ctx, node.Pos()) - .Input(mergedInput) - .Lambda() - .Args({"item"}) - .template Body<TExprApplier>() - .Apply(lambda.Cast()) - .With(0, "item") - .Build() - .Build() - .Done(); - - merged.push_back(mergedMap); - } else { - YQL_ENSURE(lists.size() == 1); - merged.push_back(lists.front()); - } - } - - YQL_ENSURE(!merged.empty()); - auto ret = merged.front(); - if (merged.size() > 1) { - ret = Build<TCoExtend>(ctx, node.Pos()) - .Add(merged) - .Done(); - } - - return ret.Ptr(); -} - -TExprNode::TPtr RewritePresentIfToFlatMap(TExprBase node, TExprContext& ctx) { - if (!node.Maybe<TCoIfPresent>()) { - return node.Ptr(); - } - auto ifPresent = node.Cast<TCoIfPresent>(); - - if (IsKqlPureLambda(ifPresent.PresentHandler())) { - return node.Ptr(); - } - - if (!ifPresent.MissingValue().Maybe<TCoNothing>()) { - return node.Ptr(); - } - - return Build<TCoFlatMap>(ctx, node.Pos()) - .Input(ifPresent.Optional()) - .Lambda() - .Args({"item"}) - .Body<TExprApplier>() - .Apply(ifPresent.PresentHandler()) - .With(ifPresent.PresentHandler().Args().Arg(0), "item") - .Build() - .Build() - .Done() - .Ptr(); -} - -class TKqpSimplifyTransformer : public TSyncTransformerBase { -public: - TKqpSimplifyTransformer() - : Simplified(false) {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - if (Simplified) { - return TStatus::Ok; - } - - TOptimizeExprSettings optSettings(nullptr); - optSettings.VisitChanges = false; - TStatus status = OptimizeExpr(input, output, - [](const TExprNode::TPtr& input, TExprContext& ctx) { - auto ret = input; - TExprBase node(input); - - ret = MergeMapsWithSameLambda<TCoMap>(node, ctx); - if (ret != input) { - return ret; - } - - ret = MergeMapsWithSameLambda<TCoFlatMap>(node, ctx); - if (ret != input) { - return ret; - } - - ret = ExtractFilter(node, ctx); - if (ret != input) { - return ret; - } - - ret = ExtractCombineByKeyPreMap(node, ctx); - if (ret != input) { - return ret; - } - - ret = ExtractPartitionByKeyListHandler<TCoPartitionByKey>(node, ctx); - if (ret != input) { - return ret; - } - - ret = ExtractPartitionByKeyListHandler<TCoPartitionsByKeys>(node, ctx); - if (ret != input) { - return ret; - } - - ret = RewritePresentIfToFlatMap(node, ctx); - if (ret != input) { - return ret; - } - - return ret; - }, ctx, optSettings); - - if (input != output) { - return TStatus(TStatus::Repeat, true); - } - - if (status == TStatus::Ok) { - Simplified = true; - } - - return status; - } - - void Rewind() override { - Simplified = false; - } - -private: - bool Simplified; -}; - -} // namespace - -TAutoPtr<IGraphTransformer> CreateKqpSimplifyTransformer() { - return new TKqpSimplifyTransformer(); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_query_substitute.cpp b/ydb/core/kqp/prepare/kqp_query_substitute.cpp deleted file mode 100644 index 6d289bf78b9..00000000000 --- a/ydb/core/kqp/prepare/kqp_query_substitute.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include "kqp_prepare_impl.h" - -#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> - -#include <ydb/library/yql/core/yql_expr_optimize.h> - -namespace NKikimr { -namespace NKqp { - -using namespace NYql; -using namespace NYql::NNodes; - -namespace { - -class TKqpSubstituteTransformer : public TSyncTransformerBase { -public: - TKqpSubstituteTransformer(TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx) - : TxState(txState) - , TransformCtx(transformCtx) {} - - TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - const auto& analyzeResults = TransformCtx->AnalyzeResults; - - if (analyzeResults.CanExecute) { - return TStatus::Ok; - } - - TNodeOnNodeOwnedMap replaceMap; - for (size_t i = 0; i < analyzeResults.ExecutionRoots.size(); ++i) { - auto newParamName = TxState->Tx().NewParamName(); - - auto node = analyzeResults.ExecutionRoots[i].Node; - - YQL_ENSURE(node.Ref().GetTypeAnn()); - auto paramNode = Build<TCoParameter>(ctx, node.Pos()) - .Name().Build(newParamName) - .Type(ExpandType(node.Pos(), *node.Ref().GetTypeAnn(), ctx)) - .Done(); - - YQL_ENSURE(!TransformCtx->MkqlResults.empty()); - ui32 mkqlIndex = TransformCtx->MkqlResults.size() - 1; - ui32 resultIndex = i; - auto indexTuple = Build<TCoAtomList>(ctx, paramNode.Pos()) - .Add().Build(ToString(mkqlIndex)) - .Add().Build(ToString(resultIndex)) - .Done(); - - paramNode.Ptr()->SetResult(indexTuple.Ptr()); - - replaceMap.emplace(node.Raw(), paramNode.Ptr()); - } - - output = ctx.ReplaceNodes(std::move(input), replaceMap); - - return TStatus(TStatus::Repeat, true); - } - -private: - TIntrusivePtr<TKqpTransactionState> TxState; - TIntrusivePtr<TKqlTransformContext> TransformCtx; -}; - -} // namespace - -TAutoPtr<IGraphTransformer> CreateKqpSubstituteTransformer(TIntrusivePtr<TKqpTransactionState> txState, - TIntrusivePtr<TKqlTransformContext> transformCtx) -{ - return new TKqpSubstituteTransformer(txState, transformCtx); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp index 90ae5876243..9f12a89f54a 100644 --- a/ydb/core/kqp/prepare/kqp_type_ann.cpp +++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp @@ -1,4 +1,4 @@ -#include "kqp_prepare_impl.h" +#include "kqp_prepare.h" #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h> @@ -1346,29 +1346,5 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckQueryTransformer() { }); } -TAutoPtr<IGraphTransformer> CreateKqpCheckKiProgramTransformer() { - return CreateFunctorTransformer( - [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus { - output = input; - - YQL_ENSURE(TMaybeNode<TKiProgram>(input)); - - auto program = TKiProgram(input); - auto effectsType = program.Effects().Ptr()->GetTypeAnn(); - bool typeOk = EnsureListType(input->Pos(), *effectsType, ctx); - if (typeOk) { - auto listType = effectsType->Cast<TListExprType>(); - typeOk = listType->GetItemType()->GetKind() == ETypeAnnotationKind::Void; - } - if (!typeOk) { - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() - << "Invalid program effects type: " << FormatType(effectsType))); - return TStatus::Error; - } - - return TStatus::Ok; - }); -} - } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 30eb9642409..a29c1d11c87 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -74,7 +74,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) { std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(), false); return CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), - server.GetRuntime()->GetNodeId(0), counters, MakeMiniKQLCompileServiceID()); + server.GetRuntime()->GetNodeId(0), counters); } void TestListPathCommon(TIntrusivePtr<IKikimrGateway> gateway) { diff --git a/ydb/core/kqp/ut/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/kqp_indexes_ut.cpp index 9a52865690e..d4a26fc98b7 100644 --- a/ydb/core/kqp/ut/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/kqp_indexes_ut.cpp @@ -35,7 +35,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) { counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(), false); return NKqp::CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(), - server.GetRuntime()->GetNodeId(0), counters, MakeMiniKQLCompileServiceID()); + server.GetRuntime()->GetNodeId(0), counters); } TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> gateway, |