aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-11-03 18:54:45 +0300
committerspuchin <spuchin@ydb.tech>2022-11-03 18:54:45 +0300
commitde0d350a9fcaa1df3682891fbada77ff32c92e5a (patch)
tree3cb6ebe7b2197d7d12035d39285b70362b79655f
parentfa02a88cc4bc0b204f32604325af59b1bb1439c2 (diff)
downloadydb-de0d350a9fcaa1df3682891fbada77ff32c92e5a.tar.gz
Remove OldEngine prepare/execution code. ()
-rw-r--r--ydb/core/kqp/common/kqp_gateway.h44
-rw-r--r--ydb/core/kqp/host/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp31
-rw-r--r--ydb/core/kqp/host/kqp_host_impl.h10
-rw-r--r--ydb/core/kqp/host/kqp_run_prepared.cpp213
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp84
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp2
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp407
-rw-r--r--ydb/core/kqp/kqp_impl.h4
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp2
-rw-r--r--ydb/core/kqp/prepare/CMakeLists.txt9
-rw-r--r--ydb/core/kqp/prepare/kqp_prepare.h124
-rw-r--r--ydb/core/kqp/prepare/kqp_prepare_impl.h36
-rw-r--r--ydb/core/kqp/prepare/kqp_query_analyze.cpp566
-rw-r--r--ydb/core/kqp/prepare/kqp_query_exec.cpp716
-rw-r--r--ydb/core/kqp/prepare/kqp_query_finalize.cpp638
-rw-r--r--ydb/core/kqp/prepare/kqp_query_plan.cpp63
-rw-r--r--ydb/core/kqp/prepare/kqp_query_plan.h16
-rw-r--r--ydb/core/kqp/prepare/kqp_query_rewrite.cpp398
-rw-r--r--ydb/core/kqp/prepare/kqp_query_simplify.cpp372
-rw-r--r--ydb/core/kqp/prepare/kqp_query_substitute.cpp73
-rw-r--r--ydb/core/kqp/prepare/kqp_type_ann.cpp26
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/kqp_indexes_ut.cpp2
24 files changed, 89 insertions, 3750 deletions
diff --git a/ydb/core/kqp/common/kqp_gateway.h b/ydb/core/kqp/common/kqp_gateway.h
index 196f6098281..91379803825 100644
--- a/ydb/core/kqp/common/kqp_gateway.h
+++ b/ydb/core/kqp/common/kqp_gateway.h
@@ -29,41 +29,6 @@ struct TKqpParamsMap {
class IKqpGateway : public NYql::IKikimrGateway {
public:
- struct TMkqlResult : public TGenericResult {
- TString CompiledProgram;
- NKikimrMiniKQL::TResult Result;
- NKikimrQueryStats::TTxStats TxStats;
- };
-
- struct TMkqlSettings {
- bool LlvmRuntime = false;
- bool CollectStats = false;
- TMaybe<ui64> PerShardKeysSizeLimitBytes;
- ui64 CancelAfterMs = 0;
- ui64 TimeoutMs = 0;
- NYql::TKikimrQueryPhaseLimits Limits;
- };
-
- struct TRunResponse {
- bool HasProxyError;
- ui32 ProxyStatus;
- TString ProxyStatusName;
- TString ProxyStatusDesc;
-
- bool HasExecutionEngineError;
- TString ExecutionEngineStatusName;
- TString ExecutionEngineStatusDesc;
-
- NYql::TIssues Issues;
-
- TString MiniKQLErrors;
- TString DataShardErrors;
- NKikimrTxUserProxy::TMiniKQLCompileResults MiniKQLCompileResults;
-
- NKikimrMiniKQL::TResult ExecutionEngineEvaluatedResponse;
- NKikimrQueryStats::TTxStats TxStats;
- };
-
struct TPhysicalTxData : private TMoveOnly {
std::shared_ptr<const NKqpProto::TKqpPhyTx> Body;
TKqpParamsMap Params;
@@ -146,15 +111,6 @@ public:
};
public:
- /* Mkql */
- virtual NThreading::TFuture<TMkqlResult> ExecuteMkql(const TString& cluster, const TString& program,
- TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) = 0;
-
- virtual NThreading::TFuture<TMkqlResult> ExecuteMkqlPrepared(const TString& cluster, const TString& program,
- TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) = 0;
-
- virtual NThreading::TFuture<TMkqlResult> PrepareMkql(const TString& cluster, const TString& program) = 0;
-
/* Snapshots */
virtual NThreading::TFuture<TKqpSnapshotHandle> CreatePersistentSnapshot(const TVector<TString>& tablePaths,
TDuration queryTimeout) = 0;
diff --git a/ydb/core/kqp/host/CMakeLists.txt b/ydb/core/kqp/host/CMakeLists.txt
index 5324cac490b..d0e18d46391 100644
--- a/ydb/core/kqp/host/CMakeLists.txt
+++ b/ydb/core/kqp/host/CMakeLists.txt
@@ -35,7 +35,6 @@ target_sources(core-kqp-host PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_explain_prepared.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_physical.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_prepared.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_run_scan.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/host/kqp_runner.cpp
)
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 8d80ff05c0a..85b0f0d8804 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -654,32 +654,21 @@ public:
if (!AsyncResult) {
auto& query = QueryCtx->PreparedQuery;
YQL_ENSURE(QueryCtx->Type != EKikimrQueryType::Unspecified);
+ YQL_ENSURE(query->GetVersion() == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1);
- if (query->GetVersion() == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1) {
- if (CurrentKqlIndex) {
- return TStatus::Ok;
- }
+ if (CurrentKqlIndex) {
+ return TStatus::Ok;
+ }
- std::shared_ptr<const NKqpProto::TKqpPhyQuery> phyQuery(query, &query->GetPhysicalQuery());
+ std::shared_ptr<const NKqpProto::TKqpPhyQuery> phyQuery(query, &query->GetPhysicalQuery());
- if (QueryCtx->Type == EKikimrQueryType::Scan) {
- AsyncResult = KqpRunner->ExecutePreparedScanQuery(Cluster, input.Get(), std::move(phyQuery),
- ctx, ExecuteCtx->ReplyTarget);
- } else {
- YQL_ENSURE(QueryCtx->Type == EKikimrQueryType::Dml);
- AsyncResult = KqpRunner->ExecutePreparedQueryNewEngine(Cluster, input.Get(), std::move(phyQuery),
- ctx, ExecuteCtx->Settings);
- }
+ if (QueryCtx->Type == EKikimrQueryType::Scan) {
+ AsyncResult = KqpRunner->ExecutePreparedScanQuery(Cluster, input.Get(), std::move(phyQuery),
+ ctx, ExecuteCtx->ReplyTarget);
} else {
- if (CurrentKqlIndex >= query->KqlsSize()) {
- return TStatus::Ok;
- }
-
- const auto& kql = query->GetKqls(CurrentKqlIndex);
-
YQL_ENSURE(QueryCtx->Type == EKikimrQueryType::Dml);
- YQL_ENSURE(!kql.GetSettings().GetNewEngine());
- AsyncResult = KqpRunner->ExecutePreparedDataQuery(Cluster, input.Get(), kql, ctx, ExecuteCtx->Settings);
+ AsyncResult = KqpRunner->ExecutePreparedQueryNewEngine(Cluster, input.Get(), std::move(phyQuery),
+ ctx, ExecuteCtx->Settings);
}
}
diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h
index d000655c9da..d3d8966b1c6 100644
--- a/ydb/core/kqp/host/kqp_host_impl.h
+++ b/ydb/core/kqp/host/kqp_host_impl.h
@@ -320,10 +320,6 @@ public:
const NYql::TExprNode::TPtr& query, NYql::TExprContext& ctx,
const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0;
- virtual TIntrusivePtr<TAsyncQueryResult> ExecutePreparedDataQuery(const TString& cluster,
- NYql::TExprNode* queryExpr, const NKikimrKqp::TPreparedKql& kql, NYql::TExprContext& ctx,
- const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0;
-
virtual TIntrusivePtr<TAsyncQueryResult> ExecutePreparedQueryNewEngine(const TString& cluster,
const NYql::TExprNode::TPtr& world, std::shared_ptr<const NKqpProto::TKqpPhyQuery>&& phyQuery,
NYql::TExprContext& ctx, const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0;
@@ -337,12 +333,8 @@ TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, co
TIntrusivePtr<NYql::TTypeAnnotationContext> typesCtx, TIntrusivePtr<NYql::TKikimrSessionContext> sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry);
-TAutoPtr<NYql::IGraphTransformer> CreateKqpExecutePreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
- const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx);
-
TAutoPtr<NYql::IGraphTransformer> CreateKqpAcquireMvccSnapshotTransformer(TIntrusivePtr<IKqpGateway> gateway,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool newEngine = false);
+ TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx);
TAutoPtr<NYql::IGraphTransformer> CreateKqpExecutePhysicalDataTransformer(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState,
diff --git a/ydb/core/kqp/host/kqp_run_prepared.cpp b/ydb/core/kqp/host/kqp_run_prepared.cpp
deleted file mode 100644
index b200da0f961..00000000000
--- a/ydb/core/kqp/host/kqp_run_prepared.cpp
+++ /dev/null
@@ -1,213 +0,0 @@
-#include "kqp_host_impl.h"
-
-#include <ydb/library/yql/utils/log/log.h>
-
-#include <google/protobuf/text_format.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYql;
-using namespace NYql::NNodes;
-using namespace NThreading;
-
-namespace {
-
-class TKqpExecutePreparedTransformer : public TGraphTransformerBase {
-public:
- TKqpExecutePreparedTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx)
- : Gateway(gateway)
- , Cluster(cluster)
- , TxState(txState)
- , TransformCtx(transformCtx)
- , CurrentMkqlIndex(0)
- {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- output = input;
-
- const auto& kql = TransformCtx->GetPreparedKql();
-
- // TODO: Enable after switch to NewEngine
- // YQL_ENSURE(!TransformCtx->Config->HasKqpForceNewEngine());
-
- if (CurrentMkqlIndex >= kql.MkqlsSize()) {
- return Finish(ctx);
- }
-
- const auto& mkql = kql.GetMkqls(CurrentMkqlIndex);
-
- AcquireLocks = ShouldAcquireLocks(kql);
-
- Promise = NewPromise();
-
- if (!Execute(mkql, MkqlExecuteResult.Future)) {
- return TStatus::Error;
- }
-
- auto promise = Promise;
- MkqlExecuteResult.Future.Apply([promise](const TFuture<IKqpGateway::TMkqlResult> future) mutable {
- YQL_ENSURE(future.HasValue());
- promise.SetValue();
- });
-
- return TStatus::Async;
- }
-
- TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
- Y_UNUSED(input);
- return Promise.GetFuture();
- }
-
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- output = input;
-
- NKikimr::NKqp::IKqpGateway::TMkqlResult result(MkqlExecuteResult.Future.ExtractValue());
- result.ReportIssues(ctx.IssueManager);
- if (!result.Success()) {
- return TStatus::Error;
- }
-
- const auto& mkql = TransformCtx->GetPreparedKql().GetMkqls(CurrentMkqlIndex);
- if (mkql.GetIsPure()) {
- Y_VERIFY_DEBUG(result.TxStats.TableAccessStatsSize() == 0);
- }
-
- auto mkqlResult = MakeSimpleShared<NKikimrMiniKQL::TResult>();
- mkqlResult->Swap(&result.Result);
-
- if (AcquireLocks) {
- if (!UnpackMergeLocks(*mkqlResult, TxState->Tx(), ctx)) {
- return TStatus::Error;
- }
- }
-
- TransformCtx->MkqlResults.push_back(mkqlResult);
- TransformCtx->AddMkqlStats(MkqlExecuteResult.Program, std::move(result.TxStats));
-
- ++CurrentMkqlIndex;
- return TStatus::Repeat;
- }
-
- void Rewind() override {
- CurrentMkqlIndex = 0;
- }
-
-private:
- TStatus Finish(TExprContext& ctx) {
- const auto& kql = TransformCtx->GetPreparedKql();
- if (!kql.GetEffects().empty()) {
- YQL_ENSURE(kql.GetEffects().size() == 1);
-
- auto& effect = kql.GetEffects(0);
-
- TExprNode::TPtr expr;
- if (!GetExpr(effect.GetNodeAst(), expr, ctx)) {
- return TStatus::Error;
- }
-
- TVector<NKikimrKqp::TParameterBinding> bindings(effect.GetBindings().begin(),
- effect.GetBindings().end());
-
- bool preserveParams = !TransformCtx->Settings.GetCommitTx();
- if (!AddDeferredEffect(TExprBase(expr), bindings, ctx, *TxState, *TransformCtx, preserveParams)) {
- return TStatus::Error;
- }
- }
- return TStatus::Ok;
- }
-
- bool Execute(const NKikimrKqp::TPreparedMkql& mkql, TFuture<IKqpGateway::TMkqlResult>& future) {
- if (YQL_CLOG_ACTIVE(DEBUG, ProviderKqp)) {
- TString mkqlText;
- NProtoBuf::TextFormat::PrintToString(mkql, &mkqlText);
- YQL_CLOG(DEBUG, ProviderKqp) << "Mkql:" << Endl << mkqlText;
- }
-
- TVector<NKikimrKqp::TParameterBinding> bindings(mkql.GetBindings().begin(), mkql.GetBindings().end());
- auto execParams = BuildParamsMap(bindings, TxState, TransformCtx, AcquireLocks);
-
- if (YQL_CLOG_ACTIVE(TRACE, ProviderKqp)) {
- TStringBuilder paramsTextBuilder;
- for (auto& pair : execParams.Values) {
- TString paramText;
- NProtoBuf::TextFormat::PrintToString(pair.second.GetValue(), &paramText);
- paramsTextBuilder << pair.first << ": " << paramText << Endl;
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL parameters:" << Endl << paramsTextBuilder;
- }
-
- if (TransformCtx->QueryCtx->StatsMode == EKikimrStatsMode::Full) {
- MkqlExecuteResult.Program = mkql.GetProgramText();
- }
-
- future = Gateway->ExecuteMkqlPrepared(Cluster, mkql.GetProgram(), std::move(execParams),
- TransformCtx->GetMkqlSettings(false, Gateway->GetCurrentTime()),
- TxState->Tx().GetSnapshot());
- return true;
- }
-
- bool GetExpr(const TString& astStr, TExprNode::TPtr& expr, TExprContext& ctx) {
- auto astRes = ParseAst(astStr);
- ctx.IssueManager.AddIssues(astRes.Issues);
- if (!astRes.IsOk()) {
- return false;
- }
-
- return CompileExpr(*astRes.Root, expr, ctx, nullptr);
- }
-
- bool ShouldAcquireLocks(const NKikimrKqp::TPreparedKql& kql) {
- if (*TxState->Tx().EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
- return false;
- }
-
- if (TxState->Tx().Locks.Broken()) {
- return false; // Do not acquire locks after first lock issue
- }
-
- if (!TxState->Tx().DeferredEffects.Empty() || !kql.GetEffects().empty()) {
- return true; // Acquire locks in read write tx
- }
-
- if (!TransformCtx->Settings.GetCommitTx()) {
- return true; // It is not a commit tx
- }
-
- if (TxState->Tx().GetSnapshot().IsValid()) {
- return false; // Read only tx with snapshot, no need to acquire locks
- }
-
- if (CurrentMkqlIndex == kql.MkqlsSize() - 1 && !TxState->Tx().Locks.HasLocks()) {
- return false; // Final phase of read only tx, no need to acquire locks
- }
-
- return true;
- }
-
-private:
- TIntrusivePtr<IKqpGateway> Gateway;
- TString Cluster;
-
- TIntrusivePtr<TKqpTransactionState> TxState;
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
-
- ui32 CurrentMkqlIndex;
- bool AcquireLocks;
- TMkqlExecuteResult MkqlExecuteResult;
- TPromise<void> Promise;
-};
-
-} // namespace
-
-TAutoPtr<IGraphTransformer> CreateKqpExecutePreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
- const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx)
-{
- return new TKqpExecutePreparedTransformer(gateway, cluster, txState, transformCtx);
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index c3a2196a66b..0eb7a61a13d 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -145,41 +145,9 @@ public:
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
{
- KqlTypeAnnTransformer = CreateTypeAnnotationTransformer(CreateExtCallableTypeAnnotationTransformer(*typesCtx),
- *typesCtx);
-
auto logLevel = NYql::NLog::ELevel::TRACE;
auto logComp = NYql::NLog::EComponent::ProviderKqp;
- KqlOptimizeTransformer = TTransformationPipeline(typesCtx)
- .AddServiceTransformers()
- .Add(TLogExprTransformer::Sync("KqlOptimizeTransformer", logComp, logLevel), "LogKqlOptimize")
- .AddTypeAnnotationTransformer()
- .AddPostTypeAnnotation()
- .AddOptimization(false)
- .Build(false);
-
- KqlPrepareTransformer = TTransformationPipeline(typesCtx)
- .AddServiceTransformers()
- .Add(TLogExprTransformer::Sync("KqlPrepareTransformer", logComp, logLevel), "LogKqlRun")
- .Add(new TKqpIterationGuardTransformer(), "IterationGuard")
- .AddTypeAnnotationTransformer()
- .Add(CreateKqpCheckKiProgramTransformer(), "CheckQuery")
- .Add(CreateKqpSimplifyTransformer(), "Simplify")
- .Add(CreateKqpAnalyzeTransformer(TransformCtx), "Analyze")
- .Add(CreateKqpRewriteTransformer(TransformCtx), "Rewrite")
- .Add(CreateKqpExecTransformer(Gateway, Cluster, TxState, TransformCtx), "Prepare")
- .Add(CreateKqpSubstituteTransformer(TxState, TransformCtx), "Substitute")
- .Add(CreateKqpFinalizeTransformer(Gateway, Cluster, TxState, TransformCtx), "Finalize")
- .Build(false);
-
- PreparedRunTransformer = TTransformationPipeline(typesCtx)
- .Add(TLogExprTransformer::Sync("PreparedRun iteration", logComp, logLevel), "KqlPreparedRun")
- .Add(CreateKqpAcquireMvccSnapshotTransformer(Gateway, TxState, TransformCtx), "AcquireMvccSnapshot")
- .Add(CreateKqpExecutePreparedTransformer(Gateway, Cluster, TxState, TransformCtx), "ExecutePrepared")
- .Add(CreateKqpFinalizeTransformer(Gateway, Cluster, TxState, TransformCtx), "Finalize")
- .Build(false);
-
PreparedExplainTransformer = TTransformationPipeline(typesCtx)
.Add(CreateKqpExplainPreparedTransformer(Gateway, Cluster, TransformCtx), "ExplainQuery")
.Build(false);
@@ -230,7 +198,7 @@ public:
.Build(false);
PhysicalRunQueryTransformer = TTransformationPipeline(typesCtx)
- .Add(CreateKqpAcquireMvccSnapshotTransformer(Gateway, TxState, TransformCtx, true), "AcquireMvccSnapshot")
+ .Add(CreateKqpAcquireMvccSnapshotTransformer(Gateway, TxState, TransformCtx), "AcquireMvccSnapshot")
.Add(CreateKqpExecutePhysicalDataTransformer(Gateway, Cluster, TxState, TransformCtx), "ExecutePhysical")
.Build(false);
@@ -277,34 +245,6 @@ public:
return PrepareQueryInternal(cluster, dataQuery, ctx, scanSettings);
}
- TIntrusivePtr<TAsyncQueryResult> ExecutePreparedDataQuery(const TString& cluster, TExprNode* queryExpr,
- const NKikimrKqp::TPreparedKql& kql, TExprContext& ctx,
- const IKikimrQueryExecutor::TExecuteSettings& settings) override
- {
- YQL_ENSURE(false, "Unexpected query execute in OldEngine mode.");
-
- YQL_ENSURE(queryExpr->Type() == TExprNode::World);
- YQL_ENSURE(cluster == Cluster);
-
- PreparedRunTransformer->Rewind();
-
- TransformCtx->Reset();
- TransformCtx->Settings.CopyFrom(kql.GetSettings());
- TransformCtx->Settings.SetCommitTx(settings.CommitTx);
- TransformCtx->Settings.SetRollbackTx(settings.RollbackTx);
- TransformCtx->PreparedKql = &kql;
-
- YQL_ENSURE(TxState->Tx().EffectiveIsolationLevel);
- YQL_ENSURE(TransformCtx->Settings.GetIsolationLevel() == NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
-
- bool strictDml = MergeFlagValue(Config->StrictDml.Get(Cluster), settings.StrictDml);
- if (!ApplyTableOperations(kql, strictDml, ctx)) {
- return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
- }
-
- return MakeIntrusive<TAsyncRunResult>(queryExpr, ctx, *PreparedRunTransformer, *TransformCtx);
- }
-
TIntrusivePtr<TAsyncQueryResult> ExecutePreparedQueryNewEngine(const TString& cluster,
const NYql::TExprNode::TPtr& world, std::shared_ptr<const NKqpProto::TKqpPhyQuery>&& phyQuery, TExprContext& ctx,
const IKikimrQueryExecutor::TExecuteSettings& settings) override
@@ -516,10 +456,6 @@ private:
TIntrusivePtr<TKqpOptimizeContext> OptimizeCtx;
TIntrusivePtr<TKqpBuildQueryContext> BuildQueryCtx;
- TAutoPtr<IGraphTransformer> KqlTypeAnnTransformer;
- TAutoPtr<IGraphTransformer> KqlOptimizeTransformer;
- TAutoPtr<IGraphTransformer> KqlPrepareTransformer;
- TAutoPtr<IGraphTransformer> PreparedRunTransformer;
TAutoPtr<IGraphTransformer> PreparedExplainTransformer;
TAutoPtr<IGraphTransformer> PhysicalOptimizeTransformer;
@@ -532,19 +468,17 @@ private:
class TKqpAcquireMvccSnapshotTransformer : public TGraphTransformerBase {
public:
TKqpAcquireMvccSnapshotTransformer(TIntrusivePtr<IKqpGateway> gateway, TIntrusivePtr<TKqlTransformContext> transformCtx,
- TIntrusivePtr<TKqpTransactionState> txState, bool newEngine)
+ TIntrusivePtr<TKqpTransactionState> txState)
: Gateway(std::move(gateway))
, TransformCtx(std::move(transformCtx))
- , NewEngine(newEngine)
- , TxState(std::move(txState))
- {}
+ , TxState(std::move(txState)) {}
TStatus DoTransform(NYql::TExprNode::TPtr input, NYql::TExprNode::TPtr& output, NYql::TExprContext&) override {
output = input;
if (!NeedSnapshot(TxState->Tx(), *TransformCtx->Config, TransformCtx->Settings.GetRollbackTx(),
- TransformCtx->Settings.GetCommitTx(), NewEngine ? TransformCtx->PhysicalQuery.get() : nullptr,
- NewEngine ? nullptr : TransformCtx->PreparedKql)) {
+ TransformCtx->Settings.GetCommitTx(),TransformCtx->PhysicalQuery.get(), nullptr))
+ {
return TStatus::Ok;
}
@@ -605,13 +539,9 @@ public:
}
private:
-
-private:
TIntrusivePtr<IKqpGateway> Gateway;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
- bool NewEngine;
-
NThreading::TFuture<IKqpGateway::TKqpSnapshotHandle> SnapshotFuture;
NThreading::TPromise<void> Promise;
TIntrusivePtr<TKqpTransactionState> TxState;
@@ -620,8 +550,8 @@ private:
} // namespace
TAutoPtr<NYql::IGraphTransformer> CreateKqpAcquireMvccSnapshotTransformer(TIntrusivePtr<IKqpGateway> gateway,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool newEngine) {
- return new TKqpAcquireMvccSnapshotTransformer(gateway, transformCtx, txState, newEngine);
+ TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx) {
+ return new TKqpAcquireMvccSnapshotTransformer(gateway, transformCtx, txState);
}
TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index bb0d0e71a81..407a7eec910 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -95,7 +95,7 @@ public:
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(TlsActivationContext->ActorSystem(), true);
Gateway = CreateKikimrIcGateway(Query.Cluster, Query.Database, std::move(loader),
- ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters, MakeMiniKQLCompileServiceID());
+ ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters);
Gateway->SetToken(Query.Cluster, UserToken);
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index 33d3d2ceb0c..7a407992e0e 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -6,9 +6,7 @@
#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/kikimr_issue.h>
-#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
-#include <ydb/core/engine/mkql_engine_flat.h>
#include <ydb/core/engine/mkql_proto.h>
#include <ydb/core/kqp/common/kqp_gateway.h>
#include <ydb/core/kqp/executer/kqp_executer.h>
@@ -25,7 +23,6 @@
#include <ydb/library/aclib/aclib.h>
#include <ydb/public/lib/base/msgbus_status.h>
-#include <ydb/library/yql/minikql/mkql_node_serialization.h>
#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -45,8 +42,6 @@ using namespace NYql::NCommon;
using namespace NSchemeShard;
using namespace NKikimrSchemeOp;
-using NKikimrTxUserProxy::TMiniKQLTransaction;
-
constexpr const IKqpGateway::TKqpSnapshot IKqpGateway::TKqpSnapshot::InvalidSnapshot = TKqpSnapshot();
#define STATIC_ASSERT_STATE_EQUAL(name) \
@@ -480,184 +475,6 @@ private:
TVector<NYql::NDqProto::TDqExecutionStats> Executions;
};
-class TMkqlRequestHandler : public TRequestHandlerBase<
- TMkqlRequestHandler,
- TEvTxUserProxy::TEvProposeTransaction,
- TEvTxUserProxy::TEvProposeTransactionStatus,
- IKqpGateway::TMkqlResult>
-{
-public:
- using TBase = typename TMkqlRequestHandler::TBase;
- using TCallbackFunc = typename TBase::TCallbackFunc;
- using TRequest = TEvTxUserProxy::TEvProposeTransaction;
- using TResponse = TEvTxUserProxy::TEvProposeTransactionStatus;
- using TResult = IKqpGateway::TMkqlResult;
-
- TMkqlRequestHandler(const TAlignedPagePoolCounters& allocCounters, TRequest* request,
- TKqpParamsMap&& paramsMap, TPromise<TResult> promise, TCallbackFunc callback,
- const TActorId& miniKqlComplileServiceActorId)
- : TBase(request, promise, callback)
- , ParamsMap(std::move(paramsMap))
- , CompilationPending(false)
- , CompilationRetried(false)
- , AllocCounters(allocCounters)
- , MiniKqlComplileServiceActorId(miniKqlComplileServiceActorId)
- {}
-
- void Bootstrap(const TActorContext& ctx) {
- auto& mkqlTx = *Request->Record.MutableTransaction()->MutableMiniKQLTransaction();
-
- if (mkqlTx.HasProgram() && mkqlTx.GetProgram().HasText()) {
- MkqlProgramText = mkqlTx.GetProgram().GetText();
- CompileProgram(ctx, mkqlTx.GetMode() == NKikimrTxUserProxy::TMiniKQLTransaction::COMPILE);
- }
-
- mkqlTx.SetCollectStats(true);
- YQL_ENSURE(!mkqlTx.HasParams());
-
- if (!ParamsMap.Values.empty()) {
- try {
- mkqlTx.MutableParams()->SetBin(BuildParams(ParamsMap, AllocCounters, ctx));
- }
- catch(const yexception& e) {
- Promise.SetValue(ResultFromError<TResult>(e.what()));
- this->Die(ctx);
- return;
- }
- }
-
- ProceedWithExecution(ctx);
-
- this->Become(&TMkqlRequestHandler::AwaitState);
- }
-
- void Handle(TMiniKQLCompileServiceEvents::TEvCompileStatus::TPtr &ev, const TActorContext &ctx) {
- const auto& result = ev->Get()->Result;
- auto& mkqlTx = *Request->Record.MutableTransaction()->MutableMiniKQLTransaction();
-
- if (!result.Errors.Empty()) {
- Promise.SetValue(ResultFromIssues<TResult>(GetMkqlCompileStatus(result.Errors),
- "Query compilation failed", result.Errors));
-
- this->Die(ctx);
- return;
- }
-
- if (mkqlTx.GetMode() == NKikimrTxUserProxy::TMiniKQLTransaction::COMPILE) {
- TResult reply;
- reply.SetSuccess();
- reply.CompiledProgram = result.CompiledProgram;
- Promise.SetValue(std::move(reply));
- this->Die(ctx);
- return;
- }
-
- auto& pgm = *mkqlTx.MutableProgram();
- pgm.ClearText();
- pgm.SetBin(result.CompiledProgram);
-
- CompileResolveCookies = std::move(ev->Get()->CompileResolveCookies);
-
- CompilationPending = false;
- ProceedWithExecution(ctx);
- }
-
- void HandleResponse(TResponse::TPtr &ev, const TActorContext &ctx) {
- auto& response = *ev->Get();
- ui32 status = response.Record.GetStatus();
-
- bool resolveError = status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ResolveError ||
- status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable;
-
- if (resolveError && !CompilationRetried && !MkqlProgramText.empty()) {
- CompilationRetried = true;
- CompileProgram(ctx, false);
- ProceedWithExecution(ctx);
- return;
- }
-
- Callback(Promise, std::move(*ev->Get()));
- this->Die(ctx);
- }
-
- using TBase::Handle;
-
- STFUNC(AwaitState) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TResponse, HandleResponse);
- HFunc(TEvTabletPipe::TEvClientConnected, Handle);
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- HFunc(TMiniKQLCompileServiceEvents::TEvCompileStatus, Handle);
- default:
- TBase::HandleUnexpectedEvent("TMkqlRequestHandler", ev->GetTypeRewrite(), ctx);
- }
- }
-
-private:
- static TString BuildParams(const TKqpParamsMap& paramsMap, const TAlignedPagePoolCounters& counters,
- const TActorContext &ctx)
- {
- NMiniKQL::TScopedAlloc alloc(__LOCATION__, counters, AppData(ctx)->FunctionRegistry->SupportsSizedAllocators());
- NMiniKQL::TTypeEnvironment env(alloc);
-
- NMiniKQL::TStructLiteralBuilder structBuilder(env);
- structBuilder.Reserve(paramsMap.Values.size());
- for (auto& pair : paramsMap.Values) {
- const TString& name = pair.first;
- const NYql::NDq::TMkqlValueRef& param = pair.second;
-
- auto valueNode = NMiniKQL::ImportValueFromProto(param.GetType(), param.GetValue(), env);
- structBuilder.Add(name, valueNode);
- }
-
- auto node = NMiniKQL::TRuntimeNode(structBuilder.Build(), true);
- return NMiniKQL::SerializeRuntimeNode(node, env);
- }
-
- NYql::EYqlIssueCode GetMkqlCompileStatus(const NYql::TIssues& issues) {
- for (auto& issue : issues) {
- switch (issue.GetCode()) {
- case NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR:
- return TIssuesIds::KIKIMR_SCHEME_MISMATCH;
-
- case NKikimrIssues::TIssuesIds::RESOLVE_LOOKUP_ERROR:
- return TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE;
- }
- }
-
- return TIssuesIds::DEFAULT_ERROR;
- }
-
- void CompileProgram(const TActorContext &ctx, bool forceRefresh) {
- auto compileEv = MakeHolder<TMiniKQLCompileServiceEvents::TEvCompile>(MkqlProgramText);
- compileEv->ForceRefresh = forceRefresh;
- if (!CompileResolveCookies.empty()) {
- compileEv->CompileResolveCookies = std::move(CompileResolveCookies);
- }
- ctx.Send(MiniKqlComplileServiceActorId, compileEv.Release());
- CompilationPending = true;
- }
-
- void ProceedWithExecution(const TActorContext& ctx) {
- if (!CompilationPending) {
- TAutoPtr<TRequest> ev = new TRequest();
- ev->Record.CopyFrom(this->Request->Record);
-
- TActorId txproxy = MakeTxProxyID();
- ctx.Send(txproxy, ev.Release());
- }
- }
-
-private:
- TKqpParamsMap ParamsMap;
- bool CompilationPending;
- bool CompilationRetried;
- TString MkqlProgramText;
- THashMap<TString, ui64> CompileResolveCookies;
- TAlignedPagePoolCounters AllocCounters;
- TActorId MiniKqlComplileServiceActorId;
-};
-
class TSchemeOpRequestHandler: public TRequestHandlerBase<
TSchemeOpRequestHandler,
TEvTxUserProxy::TEvProposeTransaction,
@@ -994,15 +811,13 @@ private:
public:
TKikimrIcGateway(const TString& cluster, const TString& database, std::shared_ptr<IKqpTableMetadataLoader>&& metadataLoader,
- TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const TActorId& mkqlComplileService)
+ TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters)
: Cluster(cluster)
, Database(database)
, ActorSystem(actorSystem)
, NodeId(nodeId)
, Counters(counters)
- , MetadataLoader(std::move(metadataLoader))
- , MkqlComplileService(mkqlComplileService)
- {}
+ , MetadataLoader(std::move(metadataLoader)) {}
bool HasCluster(const TString& cluster) override {
return cluster == Cluster;
@@ -1799,26 +1614,6 @@ public:
}
}
- TFuture<TMkqlResult> ExecuteMkql(const TString& cluster, const TString& program,
- TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) override
- {
- YQL_ENSURE(false, "Unexpected exec mkql in OldEngine mode.");
-
- return RunInternal(cluster, program, std::move(params), false, false, settings, snapshot);
- }
-
- TFuture<TMkqlResult> ExecuteMkqlPrepared(const TString& cluster, const TString& program,
- TKqpParamsMap&& params, const TMkqlSettings& settings, const TKqpSnapshot& snapshot) override
- {
- YQL_ENSURE(false, "Unexpected exeс prepared mkql in OldEngine mode.");
- return RunInternal(cluster, program, std::move(params), false, true, settings, snapshot);
- }
-
- TFuture<TMkqlResult> PrepareMkql(const TString& cluster, const TString& program) override {
- YQL_ENSURE(false, "Unexpected prepare mkql in OldEngine mode.");
- return RunInternal(cluster, program, TKqpParamsMap(), true, false, TMkqlSettings());
- }
-
TFuture<TExecPhysicalResult> ExecutePhysical(TExecPhysicalRequest&& request, const NActors::TActorId& target) override {
return ExecutePhysicalQueryInternal(std::move(request), target, false);
}
@@ -2202,17 +1997,6 @@ private:
return promise.GetFuture();
}
- TFuture<TMkqlResult> SendMkqlRequest(TEvTxUserProxy::TEvProposeTransaction* request,
- TKqpParamsMap&& paramsMap, TMkqlRequestHandler::TCallbackFunc callback)
- {
- auto promise = NewPromise<TMkqlResult>();
- IActor* requestHandler = new TMkqlRequestHandler(Counters->Counters->AllocCounters, request,
- std::move(paramsMap), promise, callback, MkqlComplileService);
- RegisterActor(requestHandler);
-
- return promise.GetFuture();
- }
-
TFuture<TGenericResult> SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request)
{
auto promise = NewPromise<TGenericResult>();
@@ -2250,91 +2034,6 @@ private:
return cluster == Cluster;
}
- TFuture<TMkqlResult> RunInternal(const TString& cluster, const TString& program, TKqpParamsMap&& params,
- bool compileOnly, bool prepared, const TMkqlSettings& settings, const TKqpSnapshot& snapshot = TKqpSnapshot::InvalidSnapshot)
- {
- using TRequest = TEvTxUserProxy::TEvProposeTransaction;
-
- try {
- if (!CheckCluster(cluster)) {
- return InvalidCluster<TMkqlResult>(cluster);
- }
-
- auto ev = MakeHolder<TRequest>();
- ev->Record.SetDatabaseName(Database);
- if (UserToken) {
- ev->Record.SetUserToken(UserToken->Serialized);
- }
- auto& mkqlTx = *ev->Record.MutableTransaction()->MutableMiniKQLTransaction();
- mkqlTx.SetFlatMKQL(true);
- mkqlTx.SetMode(compileOnly ? TMiniKQLTransaction::COMPILE : TMiniKQLTransaction::COMPILE_AND_EXEC);
-
- if (prepared) {
- mkqlTx.MutableProgram()->SetBin(program);
- } else {
- mkqlTx.MutableProgram()->SetText(program);
- }
-
- if (!compileOnly) {
- if (settings.LlvmRuntime) {
- mkqlTx.SetLlvmRuntime(true);
- }
-
- if (settings.PerShardKeysSizeLimitBytes) {
- mkqlTx.SetPerShardKeysSizeLimitBytes(*settings.PerShardKeysSizeLimitBytes);
- }
-
- if (settings.CancelAfterMs) {
- ev->Record.SetCancelAfterMs(settings.CancelAfterMs);
- }
-
- if (settings.TimeoutMs) {
- ev->Record.SetExecTimeoutPeriod(settings.TimeoutMs);
- }
-
- const auto& limits = settings.Limits;
-
- if (limits.AffectedShardsLimit) {
- mkqlTx.MutableLimits()->SetAffectedShardsLimit(limits.AffectedShardsLimit);
- }
-
- if (limits.ReadsetCountLimit) {
- mkqlTx.MutableLimits()->SetReadsetCountLimit(limits.ReadsetCountLimit);
- }
-
- if (limits.ComputeNodeMemoryLimitBytes) {
- mkqlTx.MutableLimits()->SetComputeNodeMemoryLimitBytes(limits.ComputeNodeMemoryLimitBytes);
- }
-
- if (limits.TotalReadSizeLimitBytes) {
- mkqlTx.MutableLimits()->SetTotalReadSizeLimitBytes(limits.TotalReadSizeLimitBytes);
- }
-
- if (snapshot.IsValid()) {
- mkqlTx.SetSnapshotStep(snapshot.Step);
- mkqlTx.SetSnapshotTxId(snapshot.TxId);
- }
- }
-
- if (settings.CollectStats) {
- mkqlTx.SetCollectStats(true);
- }
-
- return SendMkqlRequest(ev.Release(), std::move(params),
- [compileOnly] (TPromise<TMkqlResult> promise, TTransactionResponse&& response) {
- try {
- promise.SetValue(GetMkqlResult(GetRunResponse(std::move(response.Record)), compileOnly));
- }
- catch (yexception& e) {
- promise.SetValue(ResultFromException<TMkqlResult>(e));
- }
- });
- }
- catch (yexception& e) {
- return MakeFuture(ResultFromException<TMkqlResult>(e));
- }
- }
-
TFuture<TExecPhysicalResult> ExecutePhysicalQueryInternal(TExecPhysicalRequest&& request, const TActorId& target,
bool streaming)
{
@@ -2378,40 +2077,6 @@ private:
}
private:
- static TRunResponse GetRunResponse(NKikimrTxUserProxy::TEvProposeTransactionStatus&& ev) {
- IKqpGateway::TRunResponse response;
-
- response.HasProxyError = ev.GetStatus() != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete
- && ev.GetStatus() != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecAlready;
- response.ProxyStatus = ev.GetStatus();
- if (response.HasProxyError) {
- NMsgBusProxy::ExplainProposeTransactionStatus(ev.GetStatus(), response.ProxyStatusName,
- response.ProxyStatusDesc);
- }
-
- auto executionResponseStatus = static_cast<NMiniKQL::IEngineFlat::EStatus>(
- ev.GetExecutionEngineResponseStatus());
- auto executionEngineStatus = static_cast<NMiniKQL::IEngineFlat::EResult>(
- ev.GetExecutionEngineStatus());
- response.HasExecutionEngineError = executionResponseStatus == NMiniKQL::IEngineFlat::EStatus::Error &&
- executionEngineStatus != NMiniKQL::IEngineFlat::EResult::Ok;
- if (response.HasExecutionEngineError) {
- NMsgBusProxy::ExplainExecutionEngineStatus(ev.GetExecutionEngineStatus(),
- response.ExecutionEngineStatusName, response.ExecutionEngineStatusDesc);
- }
-
- NYql::IssuesFromMessage(ev.GetIssues(), response.Issues);
-
- response.MiniKQLErrors = ev.GetMiniKQLErrors();
- response.DataShardErrors = ev.GetDataShardErrors();
- response.MiniKQLCompileResults = ev.GetMiniKQLCompileResults();
-
- response.ExecutionEngineEvaluatedResponse.Swap(ev.MutableExecutionEngineEvaluatedResponse());
- response.TxStats = ev.GetTxStats();
-
- return response;
- }
-
static TListPathResult GetListPathResult(const TPathDescription& pathDesc, const TString& path) {
if (pathDesc.GetSelf().GetPathType() != EPathTypeDir) {
return ResultFromError<TListPathResult>(TString("Directory not found: ") + path);
@@ -2496,66 +2161,6 @@ private:
}
}
- static TMkqlResult GetMkqlResult(TRunResponse&& response, bool compileOnly) {
- auto& txRes = response.MiniKQLCompileResults;
-
- if (txRes.ProgramCompileErrorsSize() > 0) {
- NYql::TIssues errors;
- for (size_t i = 0, end = txRes.ProgramCompileErrorsSize(); i < end; ++i) {
- const auto& err = txRes.GetProgramCompileErrors(i);
- errors.AddIssue(NYql::IssueFromMessage(err));
- }
- return ResultFromIssues<TMkqlResult>(TIssuesIds::KIKIMR_COMPILE_ERROR, "MiniKQL compilation error",
- errors);
- }
-
- YQL_ENSURE(txRes.ParamsCompileErrorsSize() == 0);
-
- if (!compileOnly) {
- NYql::TIssues internalIssues;
- if (response.HasExecutionEngineError) {
- internalIssues.AddIssue(TIssue(NYql::TPosition(), TString("Execution engine failure (")
- + response.ExecutionEngineStatusName + "): " + response.ExecutionEngineStatusDesc));
- }
-
- if (!response.DataShardErrors.empty()) {
- internalIssues.AddIssue(TIssue(NYql::TPosition(), TString("Data shard errors: ")
- + response.DataShardErrors));
- }
-
- if (!response.MiniKQLErrors.empty()) {
- internalIssues.AddIssue(TIssue(NYql::TPosition(), TString("Execution engine errors: ")
- + response.MiniKQLErrors));
- }
-
- if (response.HasProxyError) {
- auto message = TString("Error executing transaction (") + response.ProxyStatusName + "): "
- + response.ProxyStatusDesc;
-
- NYql::TIssue proxyIssue(NYql::TPosition(), message);
- for (auto& issue : internalIssues) {
- proxyIssue.AddSubIssue(MakeIntrusive<TIssue>(issue));
- }
- for (auto& issue : response.Issues) {
- proxyIssue.AddSubIssue(MakeIntrusive<TIssue>(issue));
- }
-
- return ResultFromIssues<TMkqlResult>(KikimrProxyErrorStatus(response.ProxyStatus), {proxyIssue});
- }
-
- if (!internalIssues.Empty()) {
- return ResultFromErrors<TMkqlResult>(internalIssues);
- }
- }
-
- TMkqlResult result;
- result.SetSuccess();
- result.CompiledProgram = txRes.GetCompiledProgram();
- result.Result.Swap(&response.ExecutionEngineEvaluatedResponse);
- result.TxStats.Swap(&response.TxStats);
- return result;
- }
-
static NYql::EYqlIssueCode KikimrProxyErrorStatus(ui32 proxyStatus) {
NYql::EYqlIssueCode status = TIssuesIds::DEFAULT_ERROR;
@@ -2919,16 +2524,16 @@ private:
TAlignedPagePoolCounters AllocCounters;
TMaybe<TUserTokenData> UserToken;
std::shared_ptr<IKqpTableMetadataLoader> MetadataLoader;
- TActorId MkqlComplileService;
};
} // namespace
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database,
- std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters,
- const TActorId& mkqlComplileService)
+ std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader>&& metadataLoader, TActorSystem* actorSystem,
+ ui32 nodeId, TKqpRequestCounters::TPtr counters)
{
- return MakeIntrusive<TKikimrIcGateway>(cluster, database, std::move(metadataLoader), actorSystem, nodeId, counters, mkqlComplileService);
+ return MakeIntrusive<TKikimrIcGateway>(cluster, database, std::move(metadataLoader), actorSystem, nodeId,
+ counters);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/kqp_impl.h b/ydb/core/kqp/kqp_impl.h
index 0f4a047fca6..368d9ca2209 100644
--- a/ydb/core/kqp/kqp_impl.h
+++ b/ydb/core/kqp/kqp_impl.h
@@ -84,8 +84,8 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters);
TIntrusivePtr<IKqpGateway> CreateKikimrIcGateway(const TString& cluster, const TString& database,
- std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters,
- const TActorId& MkqlCompileService);
+ std::shared_ptr<IKqpGateway::IKqpTableMetadataLoader>&& metadataLoader, NActors::TActorSystem* actorSystem,
+ ui32 nodeId, TKqpRequestCounters::TPtr counters);
TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const NYql::TIssue& issue);
Ydb::StatusIds::StatusCode GetYdbStatus(const NYql::NCommon::TOperationResult& queryResult);
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 3b2a3dd8e3d..3fa34288736 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -139,7 +139,7 @@ public:
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(TlsActivationContext->ActorSystem(), false);
Gateway = CreateKikimrIcGateway(Settings.Cluster, Settings.Database, std::move(loader),
- ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters, MakeMiniKQLCompileServiceID());
+ ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters);
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
diff --git a/ydb/core/kqp/prepare/CMakeLists.txt b/ydb/core/kqp/prepare/CMakeLists.txt
index ae03f2ae962..a9dfdeee9f6 100644
--- a/ydb/core/kqp/prepare/CMakeLists.txt
+++ b/ydb/core/kqp/prepare/CMakeLists.txt
@@ -22,17 +22,12 @@ target_link_libraries(core-kqp-prepare PUBLIC
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-prepare PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_analyze.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_exec.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_finalize.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_plan.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_rewrite.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_simplify.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_substitute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_type_ann.cpp
)
generate_enum_serilization(core-kqp-prepare
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_prepare_impl.h
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/prepare/kqp_query_plan.h
INCLUDE_HEADERS
- ydb/core/kqp/prepare/kqp_prepare_impl.h
+ ydb/core/kqp/prepare/kqp_query_plan.h
)
diff --git a/ydb/core/kqp/prepare/kqp_prepare.h b/ydb/core/kqp/prepare/kqp_prepare.h
index 27c956ea148..a03b40d1a6e 100644
--- a/ydb/core/kqp/prepare/kqp_prepare.h
+++ b/ydb/core/kqp/prepare/kqp_prepare.h
@@ -6,74 +6,6 @@
namespace NKikimr {
namespace NKqp {
-enum class ECommitSafety {
- Full,
- Safe,
- Moderate
-};
-
-struct TExprScope {
- NYql::NNodes::TCallable Callable;
- NYql::NNodes::TCoLambda Lambda;
- ui32 Depth;
-
- TExprScope(NYql::NNodes::TCallable callable, NYql::NNodes::TCoLambda Lambda, ui32 depth)
- : Callable(callable)
- , Lambda(Lambda)
- , Depth(depth) {}
-};
-
-struct TNodeInfo {
- TMaybe<TExprScope> Scope;
- bool IsImmediate;
- bool RequireImmediate;
- bool IsExecutable;
- bool AreInputsExecutable;
-
- TNodeInfo()
- : IsImmediate(false)
- , RequireImmediate(false)
- , IsExecutable(false)
- , AreInputsExecutable(false) {}
-};
-
-using TExprToExprListMap = THashMap<const NYql::TExprNode*, TVector<NYql::NNodes::TExprBase>>;
-using TExprToNodeInfoMap = THashMap<const NYql::TExprNode*, TNodeInfo>;
-
-struct TScopedNode {
- NYql::NNodes::TExprBase Node;
- TMaybe<TExprScope> Scope;
-
- TScopedNode(NYql::NNodes::TExprBase node, TMaybe<TExprScope> scope = {})
- : Node(node)
- , Scope(scope) {}
-};
-
-struct TKqpAnalyzeResults {
- bool CanExecute;
- TVector<TScopedNode> ExecutionRoots;
- TExprToNodeInfoMap ExprToNodeInfoMap;
- TExprToExprListMap LambdaToExecRootsMap;
- TExprToExprListMap CallableToExecRootsMap;
-};
-
-using TMkqlResult = NKikimrMiniKQL::TResult;
-
-struct TMkqlExecuteResult {
- TString Program;
- NThreading::TFuture<IKqpGateway::TMkqlResult> Future;
-
- TMkqlExecuteResult(const TString& program, const NThreading::TFuture<IKqpGateway::TMkqlResult>& future)
- : Program(program)
- , Future(future) {}
-
- TMkqlExecuteResult(const NThreading::TFuture<IKqpGateway::TMkqlResult>& future)
- : Program()
- , Future(future) {}
-
- TMkqlExecuteResult() {}
-};
-
struct TKqlTransformContext : TThrRefBase {
TKqlTransformContext(NYql::TKikimrConfiguration::TPtr& config, TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx,
TIntrusivePtr<NYql::TKikimrTablesData> tables)
@@ -87,49 +19,17 @@ struct TKqlTransformContext : TThrRefBase {
NKikimrKqp::TKqlSettings Settings;
NActors::TActorId ReplyTarget;
- TKqpAnalyzeResults AnalyzeResults;
- NKikimrKqp::TPreparedKql* PreparingKql = nullptr;
- const NKikimrKqp::TPreparedKql* PreparedKql;
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;
TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;
- ECommitSafety CommitSafety() const {
- auto safetyValue = Config->CommitSafety.Get().GetRef();
- if (safetyValue == "Full") {
- return ECommitSafety::Full;
- } else if (safetyValue == "Safe") {
- return ECommitSafety::Safe;
- } else if (safetyValue == "Moderate") {
- return ECommitSafety::Moderate;
- }
-
- YQL_ENSURE(false, "Unexpected value for CommitSafety.");
- }
-
- NKikimrKqp::TPreparedKql& GetPreparingKql() {
- YQL_ENSURE(PreparingKql);
- return *PreparingKql;
- }
-
- const NKikimrKqp::TPreparedKql& GetPreparedKql() {
- YQL_ENSURE(PreparedKql);
- return *PreparedKql;
- }
-
- IKqpGateway::TMkqlSettings GetMkqlSettings(bool hasDataEffects, TInstant now) const;
- void AddMkqlStats(const TString& program, NKikimrQueryStats::TTxStats&& txStats);
-
void Reset() {
Settings = {};
ReplyTarget = {};
- AnalyzeResults = {};
MkqlResults.clear();
QueryStats = {};
- PreparingKql = nullptr;
- PreparedKql = nullptr;
PhysicalQuery = nullptr;
PhysicalQueryResults.clear();
}
@@ -154,7 +54,6 @@ bool AddDeferredEffect(NYql::NNodes::TExprBase effect, const TVector<NKikimrKqp:
bool AddDeferredEffect(NYql::NNodes::TExprBase effect, NYql::TExprContext& ctx, TKqpTransactionState& txState,
TKqlTransformContext& transformCtx, bool preserveParamValues);
-NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock);
bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx,
NYql::TExprContext& ctx);
@@ -162,31 +61,10 @@ bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue&
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
TKqpTransactionContext& txCtx);
-bool UnpackMergeLocks(const NKikimrMiniKQL::TResult& result, TKqpTransactionContext& txCtx, NYql::TExprContext& ctx);
-
-TKqpParamsMap BuildParamsMap(const TVector<NKikimrKqp::TParameterBinding>& bindings,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool acquireLocks);
-
-TVector<NKikimrKqp::TParameterBinding> CollectParams(NYql::NNodes::TExprBase query);
-
-NKikimrMiniKQL::TParams GetLocksParamValue(const TKqpTxLocks& locks);
-
-TAutoPtr<NYql::IGraphTransformer> CreateKqpSimplifyTransformer();
-TAutoPtr<NYql::IGraphTransformer> CreateKqpAnalyzeTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx);
-TAutoPtr<NYql::IGraphTransformer> CreateKqpRewriteTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx);
-TAutoPtr<NYql::IGraphTransformer> CreateKqpExecTransformer(TIntrusivePtr<IKqpGateway> gateway,
- const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx);
-TAutoPtr<NYql::IGraphTransformer> CreateKqpSubstituteTransformer(TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx);
-TAutoPtr<NYql::IGraphTransformer> CreateKqpFinalizeTransformer(TIntrusivePtr<IKqpGateway> gateway,
- const TString& cluster, TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx);
-
TAutoPtr<NYql::IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster,
TIntrusivePtr<NYql::TKikimrTablesData> tablesData, NYql::TTypeAnnotationContext& typesCtx,
NYql::TKikimrConfiguration::TPtr config);
-TAutoPtr<NYql::IGraphTransformer> CreateKqpCheckKiProgramTransformer();
+
TAutoPtr<NYql::IGraphTransformer> CreateKqpCheckQueryTransformer();
} // namespace NKqp
diff --git a/ydb/core/kqp/prepare/kqp_prepare_impl.h b/ydb/core/kqp/prepare/kqp_prepare_impl.h
deleted file mode 100644
index 1d6e4c5064a..00000000000
--- a/ydb/core/kqp/prepare/kqp_prepare_impl.h
+++ /dev/null
@@ -1,36 +0,0 @@
-#pragma once
-
-#include "kqp_prepare.h"
-
-namespace NKikimr {
-namespace NKqp {
-
-enum class ETableReadType {
- Unspecified,
- FullScan,
- Scan,
- Lookup,
- MultiLookup,
-};
-
-enum class ETableWriteType {
- Unspecified,
- Upsert,
- MultiUpsert,
- Erase,
- MultiErase,
-};
-
-bool HasEffects(const NYql::NNodes::TKiProgram& program);
-bool HasResults(const NYql::NNodes::TKiProgram& program);
-
-NYql::NNodes::TCoList GetEmptyEffectsList(NYql::TPositionHandle pos, NYql::TExprContext& ctx);
-
-TMkqlExecuteResult ExecuteMkql(NYql::NNodes::TKiProgram program, TIntrusivePtr<IKqpGateway> gateway,
- const TString& cluster, NYql::TExprContext& ctx, TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx, bool hasDataEffects);
-
-void LogMkqlResult(const NKikimrMiniKQL::TResult& result, NYql::TExprContext& ctx);
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_query_analyze.cpp b/ydb/core/kqp/prepare/kqp_query_analyze.cpp
deleted file mode 100644
index b37e4eb0582..00000000000
--- a/ydb/core/kqp/prepare/kqp_query_analyze.cpp
+++ /dev/null
@@ -1,566 +0,0 @@
-#include "kqp_prepare_impl.h"
-
-#include <ydb/library/yql/core/yql_expr_optimize.h>
-#include <ydb/library/yql/core/yql_expr_type_annotation.h>
-#include <ydb/library/yql/utils/log/log.h>
-
-#include <util/generic/queue.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYql;
-using namespace NYql::NNodes;
-
-namespace {
-
-const THashSet<TStringBuf> SafeCallables {
- TCoJust::CallableName(),
- TCoCoalesce::CallableName(),
- TCoToOptional::CallableName(),
- TCoHead::CallableName(),
- TCoLast::CallableName(),
- TCoToList::CallableName(),
-
- TCoMember::CallableName(),
- TCoAsStruct::CallableName(),
-
- TCoNothing::CallableName(),
- TCoNull::CallableName(),
- TCoDefault::CallableName(),
- TCoExists::CallableName(),
-
- TCoNth::CallableName()
-};
-
-const THashSet<TStringBuf> ModerateCallables {
- TCoAddMember::CallableName(),
- TCoReplaceMember::CallableName(),
-};
-
-bool IsSafePayloadCallable(const TCallable& callable) {
- if (callable.Maybe<TCoDataCtor>()) {
- return true;
- }
-
- if (callable.Maybe<TCoCompare>()) {
- return true;
- }
-
- if (callable.Maybe<TCoAnd>()) {
- return true;
- }
-
- if (callable.Maybe<TCoOr>()) {
- return true;
- }
-
- if (callable.Maybe<TCoBinaryArithmetic>()) {
- return true;
- }
-
- if (callable.Maybe<TCoCountBase>()) {
- return true;
- }
-
- auto isOptInput = [](TExprBase input) {
- return input.Maybe<TCoToList>() || input.Maybe<TCoTake>().Input().Maybe<TCoToList>();
- };
-
- if (auto maybeFilter = callable.Maybe<TCoFilterBase>()) {
- auto filter = maybeFilter.Cast();
-
- if (filter.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) {
- return true;
- }
-
- if (isOptInput(filter.Input())) {
- return true;
- }
-
- return false;
- }
-
- if (auto maybeMap = callable.Maybe<TCoMapBase>()) {
- auto map = maybeMap.Cast();
-
- if (map.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) {
- return true;
- }
-
- if (isOptInput(map.Input())) {
- if (maybeMap.Maybe<TCoMap>()) {
- return true;
- } else {
- auto body = map.Lambda().Body();
-
- return body.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional ||
- body.Maybe<TCoToList>() || body.Maybe<TCoAsList>();
- }
- }
-
- return false;
- }
-
- return SafeCallables.contains(callable.CallableName());
-}
-
-bool IsModeratePayloadCallable(TCoNameValueTupleList key, const TCallable& callable) {
- if (IsSafePayloadCallable(callable)) {
- return true;
- }
-
- if (auto selectRow = callable.Maybe<TKiSelectRow>()) {
- return selectRow.Cast().Key().Raw() == key.Raw();
- }
-
- return ModerateCallables.contains(callable.CallableName());
-}
-
-struct TAnalyzeTxContext {
- TExprToNodeInfoMap NodesInfo;
- TVector<TScopedNode> ExecutionRoots;
-};
-
-void GatherNodeScopes(TExprBase root, TAnalyzeTxContext& analyzeCtx) {
- TNodeSet visitedNodes;
-
- TQueue<TMaybe<TExprScope>> scopesQueue;
- scopesQueue.push(TMaybe<TExprScope>());
-
- while (!scopesQueue.empty()) {
- auto scope = scopesQueue.front();
- scopesQueue.pop();
-
- auto scopeRoot = scope
- ? scope->Lambda.Body()
- : root;
-
- VisitExpr(scopeRoot.Ptr(), [scope, &scopesQueue, &analyzeCtx] (const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
-
- if (node.Maybe<TCoLambda>()) {
- return false;
- }
-
- if (auto callable = node.Maybe<TCallable>()) {
- auto scopeDepth = scope ? scope->Depth : 0;
-
- for (const auto& arg : node.Cast<TVarArgCallable<TExprBase>>()) {
- if (auto lambda = arg.Maybe<TCoLambda>()) {
- TExprScope newScope(callable.Cast(), lambda.Cast(), scopeDepth + 1);
- scopesQueue.push(newScope);
-
- for (const auto& lambdaArg : lambda.Cast().Args()) {
- analyzeCtx.NodesInfo[lambdaArg.Raw()].Scope = newScope;
- }
- }
- }
- }
-
- return true;
- }, visitedNodes);
- };
-
-
- VisitExpr(root.Ptr(),
- [] (const TExprNode::TPtr& exprNode) {
- Y_UNUSED(exprNode);
- return true;
- },
- [&analyzeCtx] (const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
-
- auto& nodeInfo = analyzeCtx.NodesInfo[node.Raw()];
- if (!node.Maybe<TCoArgument>()) {
- YQL_ENSURE(!nodeInfo.Scope);
- }
-
- TMaybe<TExprScope> scope = nodeInfo.Scope;
- for (const auto& child : exprNode->Children()) {
- if (TMaybeNode<TCoLambda>(child)) {
- continue;
- }
-
- auto childScope = analyzeCtx.NodesInfo[child.Get()].Scope;
- if (childScope) {
- if (scope) {
- scope = childScope->Depth > scope->Depth
- ? childScope
- : scope;
- } else {
- scope = childScope;
- }
- }
- }
-
- analyzeCtx.NodesInfo[node.Raw()].Scope = scope;
- return true;
- });
-}
-
-void RequireImmediate(TExprBase node, TAnalyzeTxContext& ctx) {
- ctx.NodesInfo[node.Raw()].RequireImmediate = true;
-}
-
-void RequireImmediateKey(TCoNameValueTupleList key, TAnalyzeTxContext& ctx) {
- for (auto tuple : key) {
- YQL_ENSURE(tuple.Value().IsValid());
- RequireImmediate(tuple.Value().Cast(), ctx);
- }
-}
-
-void RequireImmediateRange(TExprList range, TAnalyzeTxContext& ctx) {
- for (auto tuple : range) {
- if (auto columnRange = tuple.Maybe<TKiColumnRangeTuple>()) {
- RequireImmediate(columnRange.Cast().From(), ctx);
- RequireImmediate(columnRange.Cast().To(), ctx);
- }
- }
-}
-
-void RequireImmediateSettings(TCoNameValueTupleList settings, TAnalyzeTxContext& ctx) {
- for (auto setting : settings) {
- if (setting.Value() && setting.Value().Ref().IsComputable()) {
- RequireImmediate(setting.Value().Cast(), ctx);
- }
- }
-}
-
-void RequireEffectPayloadSafety(TCoNameValueTupleList key, TCoNameValueTupleList payload, TAnalyzeTxContext& ctx,
- ECommitSafety commitSafety)
-{
- switch (commitSafety) {
- case ECommitSafety::Full:
- for (auto tuple : payload) {
- YQL_ENSURE(tuple.Value().IsValid());
- RequireImmediate(tuple.Value().Cast(), ctx);
- }
- return;
-
- case ECommitSafety::Safe:
- case ECommitSafety::Moderate:
- break;
-
- default:
- YQL_ENSURE(false, "Unexpected commit safety level.");
- }
-
- VisitExpr(payload.Ptr(), [&ctx, key, commitSafety] (const TExprNode::TPtr& exprNode) {
- TExprBase node(exprNode);
-
- if (node.Maybe<TCoLambda>()) {
- return false;
- }
-
- if (node.Maybe<TCoArgument>()) {
- return false;
- }
-
- if (!node.Ref().IsComputable()) {
- return true;
- }
-
- if (ctx.NodesInfo[node.Raw()].IsImmediate) {
- return false;
- }
-
- if (auto maybeList = node.Maybe<TExprList>()) {
- return true;
- }
-
- if (auto maybeCallable = node.Maybe<TCallable>()) {
- auto callable = maybeCallable.Cast();
-
- bool safeCallable = commitSafety == ECommitSafety::Safe
- ? IsSafePayloadCallable(callable)
- : IsModeratePayloadCallable(key, callable);
-
- if (!safeCallable) {
- RequireImmediate(callable, ctx);
- return false;
- }
-
- for (const auto& arg : callable.Cast<TVarArgCallable<TExprBase>>()) {
- if (auto lambda = arg.Maybe<TCoLambda>()) {
- auto badNode = FindNode(lambda.Cast().Body().Ptr(),
- [key, commitSafety] (const TExprNode::TPtr& node) {
- if (!node->IsCallable()) {
- return false;
- }
-
- auto callable = TCallable(node);
- bool safeCallable = commitSafety == ECommitSafety::Safe
- ? IsSafePayloadCallable(callable)
- : IsModeratePayloadCallable(key, callable);
-
- return !safeCallable;
- });
-
- if (badNode) {
- RequireImmediate(callable, ctx);
- return false;
- }
- }
- }
-
- return true;
- }
-
- RequireImmediate(node, ctx);
- return false;
- });
-}
-
-void MarkImmediateNodes(TExprBase node, TAnalyzeTxContext& ctx) {
- if (node.Maybe<TCoDataType>() ||
- node.Maybe<TCoOptionalType>())
- {
- ctx.NodesInfo[node.Raw()].IsImmediate = true;
- }
-
- if (node.Maybe<TCoDataCtor>() || node.Maybe<TCoVoid>() || node.Maybe<TCoNothing>() || node.Maybe<TCoNull>()) {
- ctx.NodesInfo[node.Raw()].IsImmediate = true;
- }
-
- if (node.Maybe<TCoParameter>()) {
- ctx.NodesInfo[node.Raw()].IsImmediate = true;
- }
-
- if (node.Maybe<TKiMapParameter>() || node.Maybe<TKiFlatMapParameter>()) {
- auto mapLambda = node.Maybe<TKiMapParameter>()
- ? node.Cast<TKiMapParameter>().Lambda()
- : node.Cast<TKiFlatMapParameter>().Lambda();
-
- ctx.NodesInfo[mapLambda.Args().Arg(0).Raw()].IsImmediate = true;
- }
-}
-
-void MarkRequireImmediateNodes(TExprBase node, TAnalyzeTxContext& ctx) {
- if (auto selectRow = node.Maybe<TKiSelectRow>()) {
- RequireImmediateKey(selectRow.Cast().Key(), ctx);
- }
-
- if (auto selectRange = node.Maybe<TKiSelectRange>()) {
- RequireImmediateRange(selectRange.Cast().Range(), ctx);
- RequireImmediateSettings(selectRange.Cast().Settings(), ctx);
- }
-
- if (auto updateRow = node.Maybe<TKiUpdateRow>()) {
- RequireImmediateKey(updateRow.Cast().Key(), ctx);
- }
-
- if (auto eraseRow = node.Maybe<TKiEraseRow>()) {
- RequireImmediateKey(eraseRow.Cast().Key(), ctx);
- }
-
- if (node.Maybe<TKiMapParameter>() || node.Maybe<TKiFlatMapParameter>()) {
- TExprBase input = node.Maybe<TKiMapParameter>()
- ? node.Cast<TKiMapParameter>().Input()
- : node.Cast<TKiFlatMapParameter>().Input();
-
- ctx.NodesInfo[input.Raw()].RequireImmediate = true;
- }
-
- if (auto condEffect = node.Maybe<TKiConditionalEffect>()) {
- ctx.NodesInfo[condEffect.Cast().Predicate().Raw()].RequireImmediate = true;
- }
-}
-
-void PropagateImmediateNodes(TExprBase node, TAnalyzeTxContext& ctx) {
- if (auto just = node.Maybe<TCoJust>()) {
- if (ctx.NodesInfo[just.Cast().Input().Raw()].IsImmediate) {
- ctx.NodesInfo[node.Raw()].IsImmediate = true;
- }
- }
-
- if (auto nth = node.Maybe<TCoNth>()) {
- if (ctx.NodesInfo[nth.Cast().Tuple().Raw()].IsImmediate) {
- ctx.NodesInfo[node.Raw()].IsImmediate = true;
- }
- }
-
- if (auto member = node.Maybe<TCoMember>()) {
- if (ctx.NodesInfo[member.Cast().Struct().Raw()].IsImmediate) {
- ctx.NodesInfo[node.Raw()].IsImmediate = true;
- }
- }
-}
-
-void EnsureNodesSafety(TExprBase node, TAnalyzeTxContext& ctx, ECommitSafety commitSafety, bool topLevel) {
- if (auto updateRow = node.Maybe<TKiUpdateRow>()) {
- RequireEffectPayloadSafety(updateRow.Cast().Key(), updateRow.Cast().Update(), ctx,
- topLevel ? commitSafety : ECommitSafety::Full);
- }
-}
-
-bool EnsureEffectsSafety(TExprBase node, TAnalyzeTxContext& ctx) {
- bool hasNewRequirements = false;
-
- VisitExpr(node.Ptr(), [&ctx, &hasNewRequirements] (const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
- auto& nodeInfo = ctx.NodesInfo[node.Raw()];
-
- if (nodeInfo.IsImmediate || nodeInfo.RequireImmediate) {
- return false;
- }
-
- if (node.Maybe<TKiUpdateRow>() || node.Maybe<TKiEraseRow>()) {
- return false;
- }
-
- if (node.Maybe<TKiSelectRow>() || node.Maybe<TKiSelectRange>()) {
- hasNewRequirements = true;
- RequireImmediate(node, ctx);
- return false;
- }
-
- return true;
- });
-
- return !hasNewRequirements;
-}
-
-void AnalyzeNode(TExprBase node, TAnalyzeTxContext& ctx) {
- auto& nodeInfo = *ctx.NodesInfo.FindPtr(node.Raw());
-
- nodeInfo.IsExecutable = true;
- nodeInfo.AreInputsExecutable = true;
-
- for (const auto& child : node.Ptr()->Children()) {
- const auto& childInfo = *ctx.NodesInfo.FindPtr(child.Get());
-
- bool canExecute = childInfo.IsExecutable;
- if (canExecute) {
- if (childInfo.RequireImmediate && !childInfo.IsImmediate) {
- ctx.ExecutionRoots.emplace_back(TExprBase(child), childInfo.Scope);
- canExecute = false;
- }
- }
-
- nodeInfo.IsExecutable = nodeInfo.IsExecutable && canExecute;
-
- if (!TMaybeNode<TCoLambda>(child)) {
- nodeInfo.AreInputsExecutable = nodeInfo.AreInputsExecutable && canExecute;
- }
-
- if (!nodeInfo.IsExecutable && !nodeInfo.AreInputsExecutable) {
- break;
- }
- }
-}
-
-void AnalyzeNodes(const TVector<TExprBase>& nodes, TAnalyzeTxContext& ctx) {
- YQL_ENSURE(ctx.ExecutionRoots.empty());
-
- for (auto& node : nodes) {
- AnalyzeNode(node, ctx);
- }
-}
-
-bool Analyze(const TExprNode::TPtr& exprRoot, TKqpAnalyzeResults& results, ECommitSafety commitSafety) {
- TAnalyzeTxContext analyzeCtx;
- TVector<TExprBase> nodes;
-
- GatherNodeScopes(TExprBase(exprRoot), analyzeCtx);
-
- VisitExpr(exprRoot,
- [&analyzeCtx] (const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
-
- MarkImmediateNodes(node, analyzeCtx);
- MarkRequireImmediateNodes(node, analyzeCtx);
- return true;
- },
- [&analyzeCtx, &nodes, commitSafety] (const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
- auto scope = analyzeCtx.NodesInfo[exprNode.Get()].Scope;
-
- PropagateImmediateNodes(node, analyzeCtx);
- EnsureNodesSafety(node, analyzeCtx, commitSafety, !scope);
-
- nodes.push_back(node);
- return true;
- });
-
- AnalyzeNodes(nodes, analyzeCtx);
-
- auto& rootInfo = analyzeCtx.NodesInfo[exprRoot.Get()];
-
- if (rootInfo.IsExecutable) {
- if (!EnsureEffectsSafety(TKiProgram(exprRoot).Effects(), analyzeCtx)) {
- AnalyzeNodes(nodes, analyzeCtx);
- }
- }
-
- results.CanExecute = rootInfo.IsExecutable;
- results.ExprToNodeInfoMap = std::move(analyzeCtx.NodesInfo);
-
- results.ExecutionRoots.clear();
- results.CallableToExecRootsMap.clear();
- results.LambdaToExecRootsMap.clear();
- for (auto& execRoot : analyzeCtx.ExecutionRoots) {
- if (execRoot.Scope) {
- auto callableInfo = results.ExprToNodeInfoMap.FindPtr(execRoot.Scope->Callable.Raw());
- YQL_ENSURE(callableInfo);
-
- if (!callableInfo->AreInputsExecutable) {
- continue;
- }
-
- results.LambdaToExecRootsMap[execRoot.Scope->Lambda.Raw()].push_back(execRoot.Node);
- results.CallableToExecRootsMap[execRoot.Scope->Callable.Raw()].push_back(execRoot.Node);
- }
-
- results.ExecutionRoots.push_back(execRoot);
- }
-
- YQL_ENSURE(results.CanExecute || !results.ExecutionRoots.empty());
-
- return true;
-}
-
-class TKqpAnalyzeTransformer : public TSyncTransformerBase {
-public:
- TKqpAnalyzeTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx)
- : TransformCtx(transformCtx) {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- Y_UNUSED(ctx);
- output = input;
-
- auto commitSafety = TransformCtx->CommitSafety();
- if (!TransformCtx->Config->HasAllowKqpUnsafeCommit()) {
- switch (commitSafety) {
- case ECommitSafety::Full:
- case ECommitSafety::Safe:
- break;
- default:
- ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::KIKIMR_BAD_OPERATION,
- "Unsafe commits not allowed for current database."));
- return TStatus::Error;
- }
- }
-
- if (!Analyze(input.Get(), TransformCtx->AnalyzeResults, commitSafety)) {
- return TStatus::Error;
- }
-
- YQL_CLOG(DEBUG, ProviderKqp) << "Analyze results:" << Endl
- << "CanExecute: " << TransformCtx->AnalyzeResults.CanExecute << Endl
- << "ExecutionRoots: " << TransformCtx->AnalyzeResults.ExecutionRoots.size();
- return TStatus::Ok;
- }
-
-private:
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
-};
-
-} // namespace
-
-TAutoPtr<IGraphTransformer> CreateKqpAnalyzeTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx) {
- return new TKqpAnalyzeTransformer(transformCtx);
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_query_exec.cpp b/ydb/core/kqp/prepare/kqp_query_exec.cpp
index bc2d4e94ce9..bc71e4ec0b3 100644
--- a/ydb/core/kqp/prepare/kqp_query_exec.cpp
+++ b/ydb/core/kqp/prepare/kqp_query_exec.cpp
@@ -1,4 +1,4 @@
-#include "kqp_prepare_impl.h"
+#include "kqp_prepare.h"
#include <ydb/core/engine/mkql_engine_flat.h>
#include <ydb/core/kqp/common/kqp_yql.h>
@@ -17,671 +17,6 @@ using namespace NYql::NNodes;
using namespace NYql::NCommon;
using namespace NThreading;
-namespace {
-
-struct TTransformState {
- TTransformState(TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx)
- : ParamsState(txState->Tx().ParamsState)
- , TransformCtx(transformCtx) {}
-
- TIntrusivePtr<TKqpTransactionContext::TParamsState> ParamsState;
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
-};
-
-struct TParamBinding {
- TString Name;
- TMaybe<ui32> ResultIndex;
-
- TParamBinding(const TString& name, TMaybe<ui32> resultIndex = {})
- : Name(name)
- , ResultIndex(resultIndex) {}
-};
-
-NKikimrKqp::TParameterBinding GetParameterBinding(TCoParameter param) {
- auto name = TString(param.Name().Value());
-
- NKikimrKqp::TParameterBinding binding;
- binding.SetName(name);
- if (param.Ref().HasResult()) {
- auto indexTuple = TCoAtomList(TExprNode::GetResult(param.Ptr()));
- ui32 mkqlIndex = FromString<ui32>(indexTuple.Item(0).Value());
- ui32 resultIndex = FromString<ui32>(indexTuple.Item(1).Value());
- binding.SetMkqlIndex(mkqlIndex);
- binding.SetResultIndex(resultIndex);
- }
-
- return binding;
-}
-
-NDq::TMkqlValueRef GetParamFromResult(const NKikimrKqp::TParameterBinding& binding,
- const TKqlTransformContext& transformCtx)
-{
- YQL_ENSURE(binding.HasMkqlIndex());
- YQL_ENSURE(binding.HasResultIndex());
- YQL_ENSURE(binding.GetMkqlIndex() < transformCtx.MkqlResults.size());
-
- const NKikimrMiniKQL::TType* type;
- const NKikimrMiniKQL::TValue* value;
- GetKikimrUnpackedRunResult(*transformCtx.MkqlResults[binding.GetMkqlIndex()], binding.GetResultIndex(),
- type, value);
- YQL_ENSURE(type);
- YQL_ENSURE(value);
-
- return NDq::TMkqlValueRef(*type, *value);
-}
-
-NKikimrMiniKQL::TParams BuildParamFromResult(const NKikimrKqp::TParameterBinding& binding,
- const TKqlTransformContext& transformCtx)
-{
- auto valueRef = GetParamFromResult(binding, transformCtx);
-
- NKikimrMiniKQL::TParams param;
- param.MutableType()->CopyFrom(valueRef.GetType());
- param.MutableValue()->CopyFrom(valueRef.GetValue());
- return param;
-}
-
-bool GetPredicateValue(const TKiConditionalEffect& effect, const TKqlTransformContext& transformCtx,
- const THashMap<TString, NKikimrKqp::TParameterBinding>& bindingsMap)
-{
- YQL_ENSURE(effect.Predicate().Maybe<TCoParameter>());
- auto paramName = effect.Predicate().Cast<TCoParameter>().Name().Value();
-
- auto binding = bindingsMap.FindPtr(paramName);
- YQL_ENSURE(binding);
- YQL_ENSURE(binding->HasMkqlIndex());
-
- auto paramValue = GetParamFromResult(*binding, transformCtx);
- YQL_ENSURE(paramValue.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::Data);
- YQL_ENSURE(paramValue.GetType().GetData().GetScheme() == NKikimr::NScheme::NTypeIds::Bool);
- return paramValue.GetValue().GetBool();
-}
-
-bool ProcessEffect(TExprBase& effect, const THashMap<TString, NKikimrKqp::TParameterBinding>& bindingsMap,
- const TKqlTransformContext& transformCtx, TExprContext& ctx)
-{
- TOptimizeExprSettings optSettings(nullptr);
- optSettings.VisitChanges = true;
- TExprNode::TPtr output;
- auto status = OptimizeExpr(effect.Ptr(), output,
- [&bindingsMap, &transformCtx](const TExprNode::TPtr& input, TExprContext& ctx) {
- TExprBase node(input);
-
- if (auto maybeRevert = node.Maybe<TKiRevertIf>()) {
- auto revert = maybeRevert.Cast();
- auto predicateValue = GetPredicateValue(revert, transformCtx, bindingsMap);
-
- if (predicateValue) {
- ctx.AddWarning(YqlIssue(ctx.GetPosition(revert.Pos()), TIssuesIds::KIKIMR_OPERATION_REVERTED, TStringBuilder()
- << "Operation reverted due to constraint violation: " << revert.Constraint().Value()));
-
- return GetEmptyEffectsList(revert.Pos(), ctx).Ptr();
- }
-
- return revert.Effect().Ptr();
- }
-
- if (auto maybeAbort = node.Maybe<TKiAbortIf>()) {
- auto abort = maybeAbort.Cast();
- auto predicateValue = GetPredicateValue(abort, transformCtx, bindingsMap);
-
- if (predicateValue) {
- ctx.AddError(YqlIssue(ctx.GetPosition(abort.Pos()), TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, TStringBuilder()
- << "Operation aborted due to constraint violation: " << abort.Constraint().Value()));
-
- return TExprNode::TPtr();
- }
-
- return abort.Effect().Ptr();
- }
-
- return input;
- }, ctx, optSettings);
-
- if (status == IGraphTransformer::TStatus::Ok) {
- effect = TExprBase(output);
- return true;
- }
-
- YQL_ENSURE(status == IGraphTransformer::TStatus::Error);
- return false;
-}
-
-NNodes::TExprBase PreserveParams(NNodes::TExprBase node,
- const THashMap<TString, NKikimrKqp::TParameterBinding>& bindingsMap, TExprContext& ctx,
- TKqpTransactionState& txState, const TKqlTransformContext& transformCtx, bool preserveValues)
-{
- TNodeOnNodeOwnedMap replaceMap;
- THashMap<TString, TExprBase> resultsMap;
- VisitExpr(node.Ptr(), [&replaceMap, &resultsMap, &ctx, &bindingsMap, &txState, &transformCtx, preserveValues] (const TExprNode::TPtr& node) {
- if (auto maybeParam = TMaybeNode<TCoParameter>(node)) {
- auto param = maybeParam.Cast();
- auto name = TString(param.Name().Value());
-
- auto bindingPtr = bindingsMap.FindPtr(name);
- YQL_ENSURE(bindingPtr);
-
- auto& binding = *bindingPtr;
-
- TMaybe<NKikimrMiniKQL::TParams> paramValue;
- TMaybeNode<TExprBase> resultNode;
- if (preserveValues) {
- if (binding.HasMkqlIndex()) {
- paramValue = BuildParamFromResult(binding, transformCtx);
- } else {
- if (!txState.Tx().ParamsState->Values.contains(binding.GetName())) {
- auto clientParam = transformCtx.QueryCtx->Parameters.FindPtr(binding.GetName());
- YQL_ENSURE(clientParam);
- paramValue = *clientParam;
- }
- }
- } else {
- if (!param.Ref().HasResult() && binding.HasMkqlIndex()) {
- resultNode = Build<TCoAtomList>(ctx, param.Pos())
- .Add().Build(ToString(binding.GetMkqlIndex()))
- .Add().Build(ToString(binding.GetResultIndex()))
- .Done();
- }
- }
-
- if (!paramValue && !resultNode) {
- return true;
- }
-
- auto newParamName = txState.Tx().NewParamName();
-
- auto newParamNode = Build<TCoParameter>(ctx, node->Pos())
- .Name().Build(newParamName)
- .Type(param.Type())
- .Done();
-
- replaceMap.emplace(node.Get(), newParamNode.Ptr());
-
- if (paramValue) {
- YQL_ENSURE(txState.Tx().ParamsState->Values.emplace(std::make_pair(newParamName, *paramValue)).second);
- }
-
- if (resultNode) {
- YQL_ENSURE(resultsMap.emplace(std::make_pair(newParamName, resultNode.Cast())).second);
- }
- }
-
- return true;
- });
-
- auto newNode = TExprBase(ctx.ReplaceNodes(node.Ptr(), replaceMap));
-
- if (!resultsMap.empty()) {
- VisitExpr(newNode.Ptr(), [&resultsMap] (const TExprNode::TPtr& node) {
- if (auto maybeParam = TMaybeNode<TCoParameter>(node)) {
- auto param = maybeParam.Cast();
- auto name = TString(param.Name().Value());
-
- auto result = resultsMap.FindPtr(name);
- if (result) {
- param.Ptr()->SetResult(result->Ptr());
- }
- }
-
- return true;
- });
- }
-
- return newNode;
-}
-
-class TKqpExecTransformer : public TGraphTransformerBase {
-public:
- TKqpExecTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx)
- : Gateway(gateway)
- , Cluster(cluster)
- , TxState(txState)
- , TransformCtx(transformCtx) {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- output = input;
-
- const auto& analyzeResults = TransformCtx->AnalyzeResults;
-
- TVector<TExprBase> results;
- for (auto& root : analyzeResults.ExecutionRoots) {
- if (root.Scope) {
- ctx.AddError(TIssue(ctx.GetPosition(root.Node.Pos()), TStringBuilder()
- << "Unexpected nested execution roots in rewritten program."));
- return TStatus::Error;
- }
-
- results.push_back(TExprBase(root.Node));
- }
-
- TMaybeNode<TExprList> resultsNode;
- if (analyzeResults.CanExecute) {
- auto kqpProgram = TKiProgram(input);
-
- if (HasEffects(kqpProgram)) {
- bool preserveParams = !TransformCtx->Settings.GetCommitTx();
- if (!AddDeferredEffect(kqpProgram.Effects(), ctx, *TxState, *TransformCtx, preserveParams)) {
- return TStatus::Error;
- }
- }
-
- if (!HasResults(kqpProgram)) {
- TransformCtx->MkqlResults.push_back(nullptr);
- return TStatus::Ok;
- }
-
- if (TransformCtx->Settings.GetCommitTx() && !TransformCtx->QueryCtx->PrepareOnly) {
- if (TxState->Tx().DeferredEffects.Empty()) {
- // Merge read-only query with commit tx
- return TStatus::Ok;
- }
- }
-
- resultsNode = kqpProgram.Results();
- } else {
- YQL_ENSURE(!results.empty());
- resultsNode = Build<TExprList>(ctx, input->Pos())
- .Add(results)
- .Done();
- }
-
- auto effectsNode = Build<TCoAsList>(ctx, input->Pos())
- .Add<TCoIf>()
- .Predicate<TCoParameter>()
- .Name().Build(LocksAcquireParamName)
- .Type<TCoDataType>()
- .Type().Build("Bool")
- .Build()
- .Build()
- .ThenValue<TKiAcquireLocks>()
- .LockTxId<TCoParameter>()
- .Name().Build(LocksTxIdParamName)
- .Type<TCoDataType>()
- .Type().Build("Uint64")
- .Build()
- .Build()
- .Build()
- .ElseValue<TCoVoid>()
- .Build()
- .Build()
- .Done();
-
- YQL_ENSURE(resultsNode);
- auto program = Build<TKiProgram>(ctx, input->Pos())
- .Results(resultsNode.Cast())
- .Effects(effectsNode)
- .Done();
-
- Promise = NewPromise();
- MkqlExecuteResult = ExecuteMkql(program, Gateway, Cluster, ctx, TxState, TransformCtx, false);
-
- auto promise = Promise;
- MkqlExecuteResult.Future.Apply([promise](const TFuture<IKqpGateway::TMkqlResult> future) mutable {
- YQL_ENSURE(future.HasValue());
- promise.SetValue();
- });
-
- return TStatus::Async;
- }
-
- TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
- Y_UNUSED(input);
- return Promise.GetFuture();
- }
-
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- output = input;
-
- NKikimr::NKqp::IKqpGateway::TMkqlResult result(MkqlExecuteResult.Future.ExtractValue());
-
- result.ReportIssues(ctx.IssueManager);
- if (!result.Success()) {
- return TStatus::Error;
- }
-
- auto mkqlResult = MakeSimpleShared<NKikimrMiniKQL::TResult>();
- mkqlResult->Swap(&result.Result);
-
- TransformCtx->MkqlResults.push_back(mkqlResult);
-
- if (TransformCtx->QueryCtx->PrepareOnly) {
- YQL_ENSURE(!TransformCtx->GetPreparingKql().GetMkqls().empty());
- auto& mkql = *TransformCtx->GetPreparingKql().MutableMkqls()->rbegin();
- mkql.SetProgram(result.CompiledProgram);
- mkql.SetProgramText(MkqlExecuteResult.Program);
- } else {
- LogMkqlResult(*mkqlResult, ctx);
- TransformCtx->AddMkqlStats(MkqlExecuteResult.Program, std::move(result.TxStats));
-
- if (TxState->Tx().EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
- if (!UnpackMergeLocks(*mkqlResult, TxState->Tx(), ctx)) {
- return TStatus::Error;
- }
- }
- }
-
- if (TransformCtx->AnalyzeResults.CanExecute) {
- output = Build<TKiProgram>(ctx, input->Pos())
- .Results()
- .Build()
- .Effects(TKiProgram(input).Effects())
- .Done()
- .Ptr();
- }
-
- return TStatus::Ok;
- }
-
- void Rewind() override {}
-
-private:
- TIntrusivePtr<IKqpGateway> Gateway;
- TString Cluster;
- TIntrusivePtr<TKqpTransactionState> TxState;
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
- TMkqlExecuteResult MkqlExecuteResult;
- TPromise<void> Promise;
-};
-
-void ExtractQueryStats(NKqpProto::TKqpStatsQuery& dst, const NKikimrQueryStats::TTxStats& txStats) {
- auto& dstExec = *dst.AddExecutions();
- NKqpProto::TKqpExecutionExtraStats executionExtraStats;
-
- dstExec.SetDurationUs(txStats.GetDurationUs());
- dstExec.SetCpuTimeUs(txStats.GetComputeCpuTimeUsec());
-
- auto& dstComputeTime = *executionExtraStats.MutableComputeCpuTimeUs();
- dstComputeTime.SetMin(txStats.GetComputeCpuTimeUsec());
- dstComputeTime.SetMax(txStats.GetComputeCpuTimeUsec());
- dstComputeTime.SetSum(txStats.GetComputeCpuTimeUsec());
- dstComputeTime.SetCnt(1);
-
- {
- i64 cnt = 0;
- ui64 minCpu = Max<ui64>();
- ui64 maxCpu = 0;
- ui64 sumCpu = 0;
- ui64 sumReadSets = 0;
- ui64 maxProgramSize = 0;
- ui64 maxReplySize = 0;
- for (const auto& perShard : txStats.GetPerShardStats()) {
- ui64 cpu = perShard.GetCpuTimeUsec();
- minCpu = Min(minCpu, cpu);
- maxCpu = Max(maxCpu, cpu);
- sumCpu += cpu;
- sumReadSets += perShard.GetOutgoingReadSetsCount();
- maxProgramSize = Max(maxProgramSize, perShard.GetProgramSize());
- maxReplySize = Max(maxReplySize, perShard.GetReplySize());
- ++cnt;
- }
- if (cnt) {
- auto& dstShardTime = *executionExtraStats.MutableShardsCpuTimeUs();
- dstShardTime.SetMin(minCpu);
- dstShardTime.SetMax(maxCpu);
- dstShardTime.SetSum(sumCpu);
- dstShardTime.SetCnt(cnt);
-
- dst.SetReadSetsCount(dst.GetReadSetsCount() + sumReadSets);
- dst.SetMaxShardProgramSize(Max(dst.GetMaxShardProgramSize(), maxProgramSize));
- dst.SetMaxShardReplySize(Max(dst.GetMaxShardReplySize(), maxReplySize));
-
- dstExec.SetCpuTimeUs(dstExec.GetCpuTimeUs() + sumCpu);
- }
- }
-
- ui32 affectedShards = 0;
- for (auto& table : txStats.GetTableAccessStats()) {
- auto& dstTable = *dstExec.AddTables();
- dstTable.SetTablePath(table.GetTableInfo().GetName());
- dstTable.SetReadRows(table.GetSelectRow().GetRows() + table.GetSelectRange().GetRows());
- dstTable.SetReadBytes(table.GetSelectRow().GetBytes() + table.GetSelectRange().GetBytes());
- dstTable.SetWriteRows(table.GetUpdateRow().GetRows());
- dstTable.SetWriteBytes(table.GetUpdateRow().GetBytes());
- dstTable.SetEraseRows(table.GetEraseRow().GetRows());
- dstTable.SetAffectedPartitions(table.GetShardCount());
-
- // NOTE: This might be incorrect in case when single shard has several
- // tables, i.e. collocated tables.
- affectedShards += table.GetShardCount();
- }
-
- executionExtraStats.SetAffectedShards(affectedShards);
-
- dstExec.MutableExtra()->PackFrom(executionExtraStats);
-}
-
-} // namespace
-
-void TKqlTransformContext::AddMkqlStats(const TString& program, NKikimrQueryStats::TTxStats&& txStats) {
- Y_UNUSED(program);
-
- ExtractQueryStats(QueryStats, txStats);
-}
-
-IKqpGateway::TMkqlSettings TKqlTransformContext::GetMkqlSettings(bool hasDataEffects, TInstant now) const {
- IKqpGateway::TMkqlSettings mkqlSettings;
- mkqlSettings.LlvmRuntime = false;
- mkqlSettings.CollectStats = QueryCtx->StatsMode >= EKikimrStatsMode::Basic;
-
- if (hasDataEffects) {
- mkqlSettings.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
- }
-
- auto& deadlines = QueryCtx->Deadlines;
- if (deadlines.CancelAt) {
- mkqlSettings.CancelAfterMs = now < deadlines.CancelAt ? (deadlines.CancelAt - now).MilliSeconds() : 1;
- }
- if (deadlines.TimeoutAt) {
- mkqlSettings.TimeoutMs = now < deadlines.TimeoutAt ? (deadlines.TimeoutAt - now).MilliSeconds() : 1;
- }
-
- mkqlSettings.Limits = QueryCtx->Limits.PhaseLimits;
-
- return mkqlSettings;
-}
-
-TMkqlExecuteResult ExecuteMkql(TKiProgram program, TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TExprContext& ctx, TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx,
- bool hasDataEffects)
-{
- auto mkqlProgram = TranslateToMkql(program, ctx, TString(ReadTargetParamName));
- if (!mkqlProgram) {
- return TMkqlExecuteResult(MakeFuture(ResultFromError<IKqpGateway::TMkqlResult>(
- "Mkql translation failed.", ctx.GetPosition(program.Pos()))));
- }
-
- auto mkqlProgramText = NCommon::SerializeExpr(ctx, mkqlProgram.Cast().Ref());
- TFuture<IKqpGateway::TMkqlResult> future;
-
- auto paramBindings = CollectParams(mkqlProgram.Cast());
-
- if (transformCtx->QueryCtx->PrepareOnly) {
- YQL_CLOG(INFO, ProviderKqp) << "Preparing MiniKQL program:" << Endl << mkqlProgramText;
-
- auto& mkql = *transformCtx->GetPreparingKql().AddMkqls();
- for (auto& binding : paramBindings) {
- mkql.AddBindings()->CopyFrom(binding);
- }
-
- mkql.SetIsPure(!hasDataEffects && IsKqlPureExpr(program));
-
- future = gateway->PrepareMkql(cluster, mkqlProgramText);
- } else {
- YQL_CLOG(INFO, ProviderKqp) << "Executing MiniKQL program:" << Endl << mkqlProgramText;
-
- bool acquireLocks = *txState->Tx().EffectiveIsolationLevel == NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE
- && !txState->Tx().Locks.Broken();
- auto execParams = BuildParamsMap(paramBindings, txState, transformCtx, acquireLocks);
-
- if (YQL_CLOG_ACTIVE(TRACE, ProviderKqp)) {
- TStringBuilder paramsTextBuilder;
- for (auto& pair : execParams.Values) {
- TString paramText;
- NProtoBuf::TextFormat::PrintToString(pair.second.GetValue(), &paramText);
- paramsTextBuilder << pair.first << ": " << paramText << Endl;
- }
-
- YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL parameters:" << Endl << paramsTextBuilder;
- }
-
- future = gateway->ExecuteMkql(cluster, mkqlProgramText, std::move(execParams),
- transformCtx->GetMkqlSettings(hasDataEffects, gateway->GetCurrentTime()), txState->Tx().GetSnapshot());
- }
-
- return TMkqlExecuteResult(mkqlProgramText, future);
-}
-
-bool AddDeferredEffect(NNodes::TExprBase effect, const TVector<NKikimrKqp::TParameterBinding>& bindings,
- TExprContext& ctx, TKqpTransactionState& txState, TKqlTransformContext& transformCtx, bool preserveParamValues)
-{
- if (transformCtx.QueryCtx->PrepareOnly) {
- auto& newEffect = *transformCtx.GetPreparingKql().AddEffects();
- newEffect.SetNodeAst(NCommon::SerializeExpr(ctx, effect.Ref()));
- for (auto& binding : bindings) {
- newEffect.AddBindings()->CopyFrom(binding);
- }
- } else {
- if (txState.Tx().Locks.Broken()) {
- txState.Tx().Locks.ReportIssues(ctx);
- return false;
- }
- THashMap<TString, NKikimrKqp::TParameterBinding> bindingsMap;
- for (auto& binding : bindings) {
- bindingsMap.emplace(binding.GetName(), binding);
- }
-
- if (!ProcessEffect(effect, bindingsMap, transformCtx, ctx)) {
- return false;
- }
-
- effect = PreserveParams(effect, bindingsMap, ctx, txState, transformCtx, preserveParamValues);
- }
-
- bool added = txState.Tx().AddDeferredEffect(effect);
- YQL_ENSURE(added, "Cannot execute new- and old- execution engine queries in the same transaction");
-
- YQL_CLOG(INFO, ProviderKqp) << "Adding deferred effect, total " << txState.Tx().DeferredEffects.Size() << ": "
- << Endl << KqpExprToPrettyString(effect, ctx);
- return true;
-}
-
-bool AddDeferredEffect(NNodes::TExprBase effect, TExprContext& ctx, TKqpTransactionState& txState,
- TKqlTransformContext& transformCtx, bool preserveParamValues)
-{
- return AddDeferredEffect(effect, CollectParams(effect), ctx, txState, transformCtx, preserveParamValues);
-}
-
-TKqpParamsMap BuildParamsMap(const TVector<NKikimrKqp::TParameterBinding>& bindings,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx, bool acquireLocks)
-{
- TKqpParamsMap paramsMap(std::make_shared<TTransformState>(txState, transformCtx));
-
- for (auto& binding : bindings) {
- auto name = binding.GetName();
-
- TMaybe<NDq::TMkqlValueRef> paramRef;
- if (binding.GetName() == LocksAcquireParamName) {
- auto& param = txState->Tx().ParamsState->Values[LocksAcquireParamName];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id);
- param.MutableValue()->SetBool(acquireLocks);
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == LocksTxIdParamName) {
- auto& param = txState->Tx().ParamsState->Values[LocksTxIdParamName];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui64>::Id);
- param.MutableValue()->SetUint64(txState->Tx().Locks.GetLockTxId());
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == ReadTargetParamName) {
- YQL_ENSURE(txState->Tx().EffectiveIsolationLevel);
-
- ui32 readTarget = (ui32)NKikimr::TReadTarget::EMode::Online;
- switch (*txState->Tx().EffectiveIsolationLevel) {
- case NKikimrKqp::ISOLATION_LEVEL_READ_UNCOMMITTED:
- readTarget = (ui32)NKikimr::TReadTarget::EMode::Head;
- break;
- case NKikimrKqp::ISOLATION_LEVEL_READ_STALE:
- readTarget = (ui32)NKikimr::TReadTarget::EMode::Follower;
- break;
- default:
- break;
- }
-
- auto& param = txState->Tx().ParamsState->Values[ReadTargetParamName];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui32>::Id);
- param.MutableValue()->SetUint32(readTarget);
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == NowParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui64>::Id);
- param.MutableValue()->SetUint64(transformCtx->QueryCtx->GetCachedNow());
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == CurrentDateParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TDate>::Id);
- ui64 date = transformCtx->QueryCtx->GetCachedDate();
- YQL_ENSURE(date <= Max<ui32>());
- param.MutableValue()->SetUint32(static_cast<ui32>(date));
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == CurrentDatetimeParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TDatetime>::Id);
- ui64 datetime = transformCtx->QueryCtx->GetCachedDatetime();
- YQL_ENSURE(datetime <= Max<ui32>());
- param.MutableValue()->SetUint32(static_cast<ui32>(datetime));
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == CurrentTimestampParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TTimestamp>::Id);
- param.MutableValue()->SetUint64(transformCtx->QueryCtx->GetCachedTimestamp());
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == RandomNumberParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<ui64>::Id);
- param.MutableValue()->SetUint64(transformCtx->QueryCtx->GetCachedRandom<ui64>());
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == RandomParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<double>::Id);
- param.MutableValue()->SetDouble(transformCtx->QueryCtx->GetCachedRandom<double>());
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.GetName() == RandomUuidParamName) {
- auto& param = txState->Tx().ParamsState->Values[binding.GetName()];
- param.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- param.MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<NUdf::TUuid>::Id);
- auto uuid = transformCtx->QueryCtx->GetCachedRandom<TGUID>();
- param.MutableValue()->SetBytes(uuid.dw, sizeof(TGUID));
- paramRef = NDq::TMkqlValueRef(param);
- } else if (binding.HasMkqlIndex()) {
- paramRef = GetParamFromResult(binding, *transformCtx);
- } else {
- auto clientParam = transformCtx->QueryCtx->Parameters.FindPtr(binding.GetName());
- if (clientParam) {
- paramRef = NDq::TMkqlValueRef(*clientParam);
- } else {
- auto paramValue = txState->Tx().ParamsState->Values.FindPtr(binding.GetName());
- YQL_ENSURE(paramValue, "Parameter not found: " << binding.GetName());
-
- paramRef = NDq::TMkqlValueRef(*paramValue);
- }
- }
-
- YQL_ENSURE(paramRef);
- auto result = paramsMap.Values.emplace(std::make_pair(name, *paramRef));
- YQL_ENSURE(result.second);
- }
-
- return paramsMap;
-}
-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
TStringBuilder message;
message << "Transaction locks invalidated.";
@@ -791,54 +126,5 @@ bool UnpackMergeLocks(const NKikimrMiniKQL::TResult& result, TKqpTransactionCont
return false;
}
-void LogMkqlResult(const NKikimrMiniKQL::TResult& result, TExprContext& ctx) {
- Y_UNUSED(ctx);
-
- if (YQL_CLOG_ACTIVE(TRACE, ProviderKqp)) {
- TString resultType;
- TString resultValue;
-
- NProtoBuf::TextFormat::PrintToString(result.GetType(), &resultType);
- NProtoBuf::TextFormat::PrintToString(result.GetValue(), &resultValue);
-
- YQL_CLOG(TRACE, ProviderKqp) << "MiniKQL results\n"
- << "Type:\n" << resultType
- << "Value:\n" << resultValue;
- }
-}
-
-bool HasEffects(const TKiProgram& program) {
- return !program.Effects().Maybe<TCoList>();
-}
-
-bool HasResults(const TKiProgram& program) {
- return !program.Results().Empty();
-}
-
-TVector<NKikimrKqp::TParameterBinding> CollectParams(TExprBase query) {
- TSet<TStringBuf> parametersSet;
- TVector<NKikimrKqp::TParameterBinding> bindings;
-
- VisitExpr(query.Ptr(), [&bindings, &parametersSet] (const TExprNode::TPtr& node) {
- if (auto maybeParam = TMaybeNode<TCoParameter>(node)) {
- auto param = maybeParam.Cast();
- auto result = parametersSet.insert(param.Name());
- if (result.second) {
- bindings.push_back(GetParameterBinding(param));
- }
- }
-
- return true;
- });
-
- return bindings;
-}
-
-TAutoPtr<IGraphTransformer> CreateKqpExecTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx)
-{
- return new TKqpExecTransformer(gateway, cluster, txState, transformCtx);
-}
-
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_query_finalize.cpp b/ydb/core/kqp/prepare/kqp_query_finalize.cpp
deleted file mode 100644
index 5e442780f7a..00000000000
--- a/ydb/core/kqp/prepare/kqp_query_finalize.cpp
+++ /dev/null
@@ -1,638 +0,0 @@
-#include "kqp_prepare_impl.h"
-
-#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
-#include <ydb/core/tx/datashard/sys_tables.h>
-
-#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/core/issue/yql_issue.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYql;
-using namespace NYql::NCommon;
-using namespace NYql::NNodes;
-using namespace NThreading;
-
-namespace {
-
-const TStringBuf LocksInvalidatedResultName = "tx_locks_invalidated";
-const TStringBuf LocksInvalidatedListName = "tx_locks_invalidated_list";
-const TStringBuf LocksTableName = "/sys/locks2";
-const TStringBuf LocksTableVersion = "0";
-const TString LocksTablePathId = TKikimrPathId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks2).ToString();
-const ui64 LocksInvalidatedCount = 1;
-
-TExprBase GetDeferredEffectsList(const TDeferredEffects& effects, TPositionHandle pos, TExprContext& ctx) {
- if (effects.Empty()) {
- return GetEmptyEffectsList(pos, ctx);
- }
-
- TVector<TExprBase> effectNodes;
- effectNodes.reserve(effects.Size());
- for (const auto& effect : effects) {
- YQL_ENSURE(effect.Params.empty());
- YQL_ENSURE(effect.Node);
- effectNodes.push_back(effect.Node.Cast());
- }
-
- return Build<TCoExtend>(ctx, pos)
- .Add(effectNodes)
- .Done();
-}
-
-TExprBase GetEraseLocksEffects(const TString& cluster, TPositionHandle pos, TCoParameter locksList, TExprContext& ctx) {
- return Build<TKiMapParameter>(ctx, pos)
- .Input(locksList)
- .Lambda()
- .Args({"lockItem"})
- .Body<TKiEraseRow>()
- .Cluster().Build(cluster)
- .Table<TKiVersionedTable>()
- .Path<TCoAtom>().Build(LocksTableName)
- .SchemaVersion<TCoAtom>().Build(LocksTableVersion)
- .PathId<TCoAtom>().Build(LocksTablePathId)
- .Build()
- .Key()
- .Add()
- .Name().Build("LockId")
- .Value<TCoMember>()
- .Struct("lockItem")
- .Name().Build("LockId")
- .Build()
- .Build()
- .Add()
- .Name().Build("DataShard")
- .Value<TCoMember>()
- .Struct("lockItem")
- .Name().Build("DataShard")
- .Build()
- .Build()
- .Add()
- .Name().Build("SchemeShard")
- .Value<TCoMember>()
- .Struct("lockItem")
- .Name().Build("SchemeShard")
- .Build()
- .Build()
- .Add()
- .Name().Build("PathId")
- .Value<TCoMember>()
- .Struct("lockItem")
- .Name().Build("PathId")
- .Build()
- .Build()
- .Build()
- .Build()
- .Build()
- .Done();
-}
-
-const TTypeAnnotationNode* GetTxLockListType(TExprContext& ctx) {
- auto ui32Type = ctx.MakeType<TDataExprType>(EDataSlot::Uint32);
- auto ui64Type = ctx.MakeType<TDataExprType>(EDataSlot::Uint64);
- TVector<const TItemExprType*> lockItems;
- lockItems.reserve(6);
- lockItems.push_back(ctx.MakeType<TItemExprType>("LockId", ui64Type));
- lockItems.push_back(ctx.MakeType<TItemExprType>("DataShard", ui64Type));
- lockItems.push_back(ctx.MakeType<TItemExprType>("SchemeShard", ui64Type));
- lockItems.push_back(ctx.MakeType<TItemExprType>("PathId", ui64Type));
- lockItems.push_back(ctx.MakeType<TItemExprType>("Generation", ui32Type));
- lockItems.push_back(ctx.MakeType<TItemExprType>("Counter", ui64Type));
- auto lockType = ctx.MakeType<TStructExprType>(lockItems);
- auto lockListType = ctx.MakeType<TListExprType>(lockType);
- return lockListType;
-}
-
-class TKqpFinalizeTransformer : public TGraphTransformerBase {
-public:
- TKqpFinalizeTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx)
- : Gateway(gateway)
- , Cluster(cluster)
- , TxState(txState)
- , TransformCtx(transformCtx)
- , HasProgramResults(false) {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- output = input;
-
- YQL_ENSURE(State == EFinalizeState::Initial);
-
- auto resultsNode = GetResultsNode(TExprBase(input), ctx);
- TExprBase effectsNode = GetEmptyEffectsList(input->Pos(), ctx);
-
- auto settings = TransformCtx->Settings;
-
- if (settings.GetRollbackTx()) {
- YQL_ENSURE(!settings.GetCommitTx());
-
- YQL_CLOG(INFO, ProviderKqp) << "Rollback Tx"
- << ", deferred effects count: " << TxState->Tx().DeferredEffects.Size()
- << ", locks count: " << TxState->Tx().Locks.Size();
-
- effectsNode = GetRollbackEffects(input->Pos(), ctx);
- State = EFinalizeState::RollbackInProgress;
- }
-
- bool hasDataEffects = false;
- if (settings.GetCommitTx()) {
- YQL_ENSURE(!settings.GetRollbackTx());
-
- effectsNode = GetCommitEffects(input->Pos(), ctx, hasDataEffects);
-
- {
- if (!CheckCommitEffects(effectsNode, ctx)) {
- return RollbackOnError(ctx);
- }
-
- if (TxState->Tx().IsInvalidated()) {
- ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::KIKIMR_OPERATION_ABORTED, TStringBuilder()
- << "Failed to commit transaction due to previous errors."));
- return RollbackOnError(ctx);
- }
- }
-
- YQL_CLOG(INFO, ProviderKqp) << "Commit Tx"
- << ", deferred effects count: " << TxState->Tx().DeferredEffects.Size()
- << ", locks count: " << TxState->Tx().Locks.Size();
-
- State = EFinalizeState::CommitInProgress;
- }
-
- auto program = Build<TKiProgram>(ctx, input->Pos())
- .Results(resultsNode)
- .Effects(effectsNode)
- .Done();
-
- HasProgramResults = HasResults(program);
- if (!HasProgramResults && !HasEffects(program)) {
- if (State != EFinalizeState::Initial) {
- ResetTxState(State == EFinalizeState::CommitInProgress);
- }
-
- State = EFinalizeState::Initial;
- return TStatus::Ok;
- }
-
- if (TransformCtx->QueryCtx->PrepareOnly) {
- YQL_ENSURE(!HasProgramResults);
- State = EFinalizeState::Initial;
- return TStatus::Ok;
- }
-
- return ExecuteProgram(program, ctx, hasDataEffects, ShouldWaitForResults(hasDataEffects));
- }
-
- TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
- Y_UNUSED(input);
- return Promise.GetFuture();
- }
-
- TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- output = input;
-
- NKikimr::NKqp::IKqpGateway::TMkqlResult result(MkqlExecuteResult.Future.ExtractValue());
- bool success = result.Success();
-
- if (success) {
- LogMkqlResult(result.Result, ctx);
- TransformCtx->AddMkqlStats(MkqlExecuteResult.Program, std::move(result.TxStats));
- }
-
- switch (State) {
- case EFinalizeState::CommitInProgress: {
- TMaybe<TKqpTxLock> invalidatedLock;
- bool locksOk = success
- ? CheckInvalidatedLocks(result.Result, invalidatedLock)
- : true;
-
- success = success && locksOk;
-
- if (!success) {
- result.ReportIssues(ctx.IssueManager);
-
- if (!locksOk) {
- ctx.AddError(GetLocksInvalidatedIssue(TxState->Tx(), invalidatedLock));
- }
- } else {
- result.ReportIssues(ctx.IssueManager);
-
- auto mkqlResult = MakeSimpleShared<NKikimrMiniKQL::TResult>();
- mkqlResult->Swap(&result.Result);
-
- if (HasProgramResults) {
- TransformCtx->MkqlResults.push_back(mkqlResult);
- }
- }
-
- break;
- }
-
- case EFinalizeState::RollbackInProgress: {
- success = true;
- break;
- }
-
- case EFinalizeState::RollbackOnErrorInProgress: {
- success = false;
- break;
- }
-
- default:
- YQL_ENSURE(false, "Unexpected state in finalize transformer.");
- break;
- }
-
- ResetTxState(State == EFinalizeState::CommitInProgress);
- State = EFinalizeState::Initial;
-
- return success ? TStatus::Ok : TStatus::Error;
- }
-
- void Rewind() override {
- State = EFinalizeState::Initial;
- }
-
-private:
- enum class EFinalizeState {
- Initial,
- CommitInProgress,
- RollbackInProgress,
- RollbackOnErrorInProgress
- };
-
- TStatus ExecuteProgram(TKiProgram program, TExprContext& ctx, bool hasDataEffects, bool waitForResults) {
- if (waitForResults) {
- Promise = NewPromise();
- MkqlExecuteResult = ExecuteMkql(program, Gateway, Cluster, ctx, TxState, TransformCtx, hasDataEffects);
-
- auto promise = Promise;
- MkqlExecuteResult.Future.Apply([promise](const TFuture<IKqpGateway::TMkqlResult> future) mutable {
- YQL_ENSURE(future.HasValue());
- promise.SetValue();
- });
-
- return TStatus::Async;
- }
-
- YQL_ENSURE(!hasDataEffects);
-
- ExecuteMkql(program, Gateway, Cluster, ctx, TxState, TransformCtx, false);
-
- ResetTxState(true);
- State = EFinalizeState::Initial;
-
- return TStatus::Ok;
- }
-
- TStatus RollbackOnError(TExprContext& ctx) {
- YQL_ENSURE(State == EFinalizeState::Initial);
-
- YQL_CLOG(INFO, ProviderKqp) << "Rollback Tx On Error"
- << ", deferred effects count: " << TxState->Tx().DeferredEffects.Size()
- << ", locks count: " << TxState->Tx().Locks.Size();
-
- auto program = Build<TKiProgram>(ctx, TPositionHandle())
- .Results()
- .Build()
- .Effects(GetRollbackEffects(TPositionHandle(), ctx))
- .Done();
-
- if (!HasEffects(program)) {
- ResetTxState(false);
-
- return TStatus::Error;
- }
-
- State = EFinalizeState::RollbackOnErrorInProgress;
- return ExecuteProgram(program, ctx, false, true);
- }
-
- TExprList GetResultsNode(TExprBase program, TExprContext& ctx) {
- TMaybeNode<TExprList> resultsNode;
-
- if (program.Maybe<TCoWorld>()) {
- resultsNode = Build<TExprList>(ctx, program.Pos()).Done();
- } else {
- const auto& analyzeResults = TransformCtx->AnalyzeResults;
- YQL_ENSURE(analyzeResults.CanExecute);
-
- resultsNode = program.Cast<TKiProgram>().Results();
- }
-
- return resultsNode.Cast();
- }
-
- bool CheckCommitEffects(TExprBase effects, TExprContext& ctx) const {
- TIssueScopeGuard issueScope(ctx.IssueManager, [effects, &ctx]() {
- return MakeIntrusive<TIssue>(YqlIssue(ctx.GetPosition(effects.Pos()), TIssuesIds::DEFAULT_ERROR,
- "Failed to commit transaction"));
- });
-
- TMaybeNode<TExprBase> blackistedNode;
- ui32 readsCount = 0;
- VisitExpr(effects.Ptr(), [&blackistedNode, &readsCount](const TExprNode::TPtr& exprNode) {
- if (blackistedNode) {
- return false;
- }
-
- if (auto maybeCallable = TMaybeNode<TCallable>(exprNode)) {
- auto callable = maybeCallable.Cast();
-
- if (callable.CallableName() == "Udf" ||
- callable.Maybe<TKiSelectRange>())
- {
- blackistedNode = callable;
- return false;
- }
-
- if (callable.Maybe<TKiSelectRow>()) {
- ++readsCount;
- }
- }
-
- return true;
- });
-
- if (blackistedNode) {
- ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder()
- << "Callable not expected in commit tx: " << blackistedNode.Cast<TCallable>().CallableName()));
- return false;
- }
-
- ui32 maxReadsCount = TransformCtx->Config->_CommitReadsLimit.Get().GetRef();
- if (readsCount > maxReadsCount) {
- ctx.AddError(TIssue(ctx.GetPosition(effects.Pos()), TStringBuilder()
- << "Reads limit exceeded in commit tx: " << readsCount << " > " << maxReadsCount));
- return false;
- }
-
- return true;
- }
-
- static bool CheckInvalidatedLocks(const NKikimrMiniKQL::TResult& result, TMaybe<TKqpTxLock>& invalidatedLock) {
- auto structType = result.GetType().GetStruct();
-
- ui32 resultIndex;
- if (!GetRunResultIndex(structType, TString(LocksInvalidatedResultName), resultIndex)) {
- return true;
- }
-
- auto locksResult = result.GetValue().GetStruct(resultIndex);
- if (locksResult.HasOptional()) {
- bool invalidated = locksResult.GetOptional().GetBool();
- YQL_ENSURE(invalidated);
-
- ui32 listIndex;
- if (GetRunResultIndex(structType, TString(LocksInvalidatedListName), listIndex)) {
- auto& locksResult = result.GetValue().GetStruct(listIndex);
- auto& list = locksResult.GetOptional().GetList();
- if (!list.empty()) {
- invalidatedLock = TKqpTxLock(list.Get(0));
- }
- }
-
- return false;
- }
-
- return true;
- }
-
- void ResetTxState(bool committed) {
- if (!committed) {
- TxState->Tx().Invalidate();
- }
-
- TxState->Tx().ClearDeferredEffects();
- TxState->Tx().Locks.Clear();
- TxState->Tx().Finish();
- }
-
- TExprBase GetRollbackEffects(TPositionHandle pos, TExprContext& ctx) {
- if (!TxState->Tx().Locks.HasLocks()) {
- return GetEmptyEffectsList(pos, ctx);
- }
-
- auto locksParamName = TxState->Tx().NewParamName();
- YQL_ENSURE(TxState->Tx().ParamsState->Values.emplace(std::make_pair(locksParamName,
- GetLocksParamValue(TxState->Tx().Locks))).second);
-
- auto locksParamNode = Build<TCoParameter>(ctx, pos)
- .Name().Build(locksParamName)
- .Type(ExpandType(pos, *GetTxLockListType(ctx), ctx))
- .Done();
-
- return GetEraseLocksEffects(Cluster, pos, locksParamNode, ctx);
- }
-
- TExprBase GetCommitEffects(TPositionHandle pos, TExprContext& ctx, bool& hasDataEffects) {
- hasDataEffects = !TxState->Tx().DeferredEffects.Empty();
-
- Y_VERIFY_DEBUG(!hasDataEffects || !TxState->Tx().Locks.Broken());
-
- auto deferredEffects = GetDeferredEffectsList(TxState->Tx().DeferredEffects, pos, ctx);
-
- if (!TxState->Tx().Locks.HasLocks()) {
- return deferredEffects;
- }
-
- auto locksParamName = TxState->Tx().NewParamName();
- YQL_ENSURE(TxState->Tx().ParamsState->Values.emplace(std::make_pair(locksParamName,
- GetLocksParamValue(TxState->Tx().Locks))).second);
-
- auto locksParamNode = Build<TCoParameter>(ctx, pos)
- .Name().Build(locksParamName)
- .Type(ExpandType(pos, *GetTxLockListType(ctx), ctx))
- .Done();
-
- if (!hasDataEffects && TxState->Tx().GetSnapshot().IsValid())
- return GetEraseLocksEffects(Cluster, pos, locksParamNode, ctx);
-
- auto lockArg = Build<TCoArgument>(ctx, pos)
- .Name("lockArg")
- .Done();
-
- auto selectLock = Build<TKiSelectRow>(ctx, pos)
- .Cluster().Build(Cluster)
- .Table<TKiVersionedTable>()
- .Path<TCoAtom>().Build(LocksTableName)
- .SchemaVersion<TCoAtom>().Build(LocksTableVersion)
- .PathId<TCoAtom>().Build(LocksTablePathId)
- .Build()
- .Key()
- .Add()
- .Name().Build("LockId")
- .Value<TCoMember>()
- .Struct(lockArg)
- .Name().Build("LockId")
- .Build()
- .Build()
- .Add()
- .Name().Build("DataShard")
- .Value<TCoMember>()
- .Struct(lockArg)
- .Name().Build("DataShard")
- .Build()
- .Build()
- .Add()
- .Name().Build("SchemeShard")
- .Value<TCoMember>()
- .Struct(lockArg)
- .Name().Build("SchemeShard")
- .Build()
- .Build()
- .Add()
- .Name().Build("PathId")
- .Value<TCoMember>()
- .Struct(lockArg)
- .Name().Build("PathId")
- .Build()
- .Build()
- .Build()
- .Select()
- .Add().Build("Generation")
- .Add().Build("Counter")
- .Build()
- .Done();
-
- TVector<TExprBase> args = {
- Build<TCoCmpEqual>(ctx, pos)
- .Left<TCoMember>()
- .Struct(selectLock)
- .Name().Build("Generation")
- .Build()
- .Right<TCoMember>()
- .Struct(lockArg)
- .Name().Build("Generation")
- .Build()
- .Done(),
- Build<TCoCmpEqual>(ctx, pos)
- .Left<TCoMember>()
- .Struct(selectLock)
- .Name().Build("Counter")
- .Build()
- .Right<TCoMember>()
- .Struct(lockArg)
- .Name().Build("Counter")
- .Build()
- .Done()
- };
- auto lockPredicate = Build<TCoNot>(ctx, pos)
- .Value<TCoCoalesce>()
- .Predicate<TCoAnd>()
- .Add(args)
- .Build()
- .Value<TCoBool>()
- .Literal().Build("false")
- .Build()
- .Build()
- .Done();
-
- auto locksInvalidatedList = Build<TKiFlatMapParameter>(ctx, pos)
- .Input(locksParamNode)
- .Lambda()
- .Args(lockArg)
- .Body<TCoListIf>()
- .Predicate(lockPredicate)
- .Value(lockArg)
- .Build()
- .Build()
- .Done();
-
- auto locksInvalidatedPredicate = Build<TCoHasItems>(ctx, pos)
- .List(locksInvalidatedList)
- .Done();
-
- auto effects = Build<TCoExtend>(ctx, pos)
- .Add<TCoIf>()
- .Predicate(locksInvalidatedPredicate)
- .ThenValue<TCoAsList>()
- .Add<TKiSetResult>()
- .Name().Build(LocksInvalidatedResultName)
- .Data(locksInvalidatedPredicate)
- .Build()
- .Add<TKiSetResult>()
- .Name().Build(LocksInvalidatedListName)
- .Data<TCoTake>()
- .Input(locksInvalidatedList)
- .Count<TCoUint64>()
- .Literal().Build(ToString(LocksInvalidatedCount))
- .Build()
- .Build()
- .Build()
- .Build()
- .ElseValue(deferredEffects)
- .Build()
- .Add(GetEraseLocksEffects(Cluster, pos, locksParamNode, ctx))
- .Done();
-
- return effects;
- }
-
- bool ShouldWaitForResults(bool hasDataEffects) {
- if (State != EFinalizeState::CommitInProgress) {
- return true;
- }
-
- if (hasDataEffects) {
- return true;
- }
-
- if (!TxState->Tx().GetSnapshot().IsValid()) {
- return true;
- }
-
- if (HasProgramResults) {
- return true;
- }
-
- return false;
- }
-
-private:
- TIntrusivePtr<IKqpGateway> Gateway;
- TString Cluster;
- TIntrusivePtr<TKqpTransactionState> TxState;
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
- EFinalizeState State;
- bool HasProgramResults;
- TMkqlExecuteResult MkqlExecuteResult;
- TPromise<void> Promise;
-};
-
-} // namespace
-
-TCoList GetEmptyEffectsList(const TPositionHandle pos, TExprContext& ctx) {
- return Build<TCoList>(ctx, pos)
- .ListType<TCoListType>()
- .ItemType<TCoVoidType>()
- .Build()
- .Build()
- .Done();
-}
-
-NKikimrMiniKQL::TParams GetLocksParamValue(const TKqpTxLocks& locks) {
- YQL_ENSURE(locks.HasLocks());
-
- NKikimrMiniKQL::TParams locksResult;
- auto type = locksResult.MutableType();
- type->SetKind(NKikimrMiniKQL::ETypeKind::List);
- type->MutableList()->CopyFrom(locks.LocksListType);
- auto value = locksResult.MutableValue();
- for (auto& pair : locks.LocksMap) {
- auto lockValue = value->AddList();
- lockValue->CopyFrom(pair.second.GetValue());
- }
-
- return locksResult;
-}
-
-TAutoPtr<IGraphTransformer> CreateKqpFinalizeTransformer(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
- TIntrusivePtr<TKqpTransactionState> txState, TIntrusivePtr<TKqlTransformContext> transformCtx)
-{
- return new TKqpFinalizeTransformer(gateway, cluster, txState, transformCtx);
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_query_plan.cpp b/ydb/core/kqp/prepare/kqp_query_plan.cpp
index cfbd16d0f99..068f352dbfb 100644
--- a/ydb/core/kqp/prepare/kqp_query_plan.cpp
+++ b/ydb/core/kqp/prepare/kqp_query_plan.cpp
@@ -1,4 +1,6 @@
-#include "kqp_prepare_impl.h"
+#include "kqp_prepare.h"
+
+#include "kqp_query_plan.h"
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
@@ -30,7 +32,7 @@ using namespace NClient;
namespace {
struct TTableRead {
- ETableReadType Type = ETableReadType::Unspecified;
+ EPlanTableReadType Type = EPlanTableReadType::Unspecified;
TVector<TString> LookupBy;
TVector<TString> ScanBy;
TVector<TString> Columns;
@@ -39,7 +41,7 @@ struct TTableRead {
};
struct TTableWrite {
- ETableWriteType Type = ETableWriteType::Unspecified;
+ EPlanTableWriteType Type = EPlanTableWriteType::Unspecified;
TVector<TString> Keys;
TVector<TString> Columns;
};
@@ -49,6 +51,17 @@ struct TTableInfo {
TVector<TTableWrite> Writes;
};
+struct TExprScope {
+ NYql::NNodes::TCallable Callable;
+ NYql::NNodes::TCoLambda Lambda;
+ ui32 Depth;
+
+ TExprScope(NYql::NNodes::TCallable callable, NYql::NNodes::TCoLambda Lambda, ui32 depth)
+ : Callable(callable)
+ , Lambda(Lambda)
+ , Depth(depth) {}
+};
+
struct TSerializerCtx {
TSerializerCtx(TExprContext& exprCtx, const TString& cluster,
const TIntrusivePtr<NYql::TKikimrTablesData> tablesData,
@@ -211,12 +224,12 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab
if (read.LookupBy.empty()) {
read.Type = TKikimrKeyRange::IsFull(selectRange.Range())
- ? ETableReadType::FullScan
- : ETableReadType::Scan;
+ ? EPlanTableReadType::FullScan
+ : EPlanTableReadType::Scan;
} else {
read.Type = mapDepth > 0
- ? ETableReadType::MultiLookup
- : ETableReadType::Lookup;
+ ? EPlanTableReadType::MultiLookup
+ : EPlanTableReadType::Lookup;
}
tables[TString(selectRange.Table().Path())].Reads.push_back(read);
@@ -227,8 +240,8 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab
TTableRead read;
read.Type = mapDepth > 0
- ? ETableReadType::MultiLookup
- : ETableReadType::Lookup;
+ ? EPlanTableReadType::MultiLookup
+ : EPlanTableReadType::Lookup;
for (const auto& key : selectRow.Key()) {
auto lookup = TStringBuilder() << key.Name().Value()
@@ -248,8 +261,8 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab
TTableWrite write;
write.Type = mapDepth > 0
- ? ETableWriteType::MultiUpsert
- : ETableWriteType::Upsert;
+ ? EPlanTableWriteType::MultiUpsert
+ : EPlanTableWriteType::Upsert;
for (const auto& tuple : updateRow.Key()) {
@@ -270,8 +283,8 @@ void FillTablesInfo(const TExprNode::TPtr& query, TMap<TString, TTableInfo>& tab
TTableWrite write;
write.Type = mapDepth > 0
- ? ETableWriteType::MultiErase
- : ETableWriteType::Erase;
+ ? EPlanTableWriteType::MultiErase
+ : EPlanTableWriteType::Erase;
for (const auto& tuple : eraseRow.Key()) {
auto key = TStringBuilder() << tuple.Name().Value()
@@ -683,7 +696,7 @@ private:
auto tableLookup = maybeTableLookup.Cast();
TTableRead readInfo;
- readInfo.Type = ETableReadType::Lookup;
+ readInfo.Type = EPlanTableReadType::Lookup;
planNode.TypeName = "TableLookup";
TString table(tableLookup.Table().Path().Value());
auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table);
@@ -935,7 +948,7 @@ private:
op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
TTableWrite writeInfo;
- writeInfo.Type = ETableWriteType::MultiUpsert;
+ writeInfo.Type = EPlanTableWriteType::MultiUpsert;
for (const auto& column : upsert.Columns()) {
writeInfo.Columns.push_back(TString(column.Value()));
}
@@ -954,7 +967,7 @@ private:
op.Properties["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
TTableWrite writeInfo;
- writeInfo.Type = ETableWriteType::MultiErase;
+ writeInfo.Type = EPlanTableWriteType::MultiErase;
SerializerCtx.Tables[table].Writes.push_back(writeInfo);
planNode.NodeInfo["Tables"].AppendValue(op.Properties["Table"]);
@@ -1029,7 +1042,7 @@ private:
ui32 Visit(const TKqlLookupTableBase& lookup, TQueryPlanNode& planNode) {
auto table = TString(lookup.Table().Path().Value());
TTableRead readInfo;
- readInfo.Type = ETableReadType::Lookup;
+ readInfo.Type = EPlanTableReadType::Lookup;
TOperator op;
op.Properties["Name"] = "TablePointLookup";
@@ -1059,7 +1072,7 @@ private:
auto rangesDesc = PrettyExprStr(read.Ranges());
if (rangesDesc == "Void" || explainPrompt.UsedKeyColumns.empty()) {
- readInfo.Type = ETableReadType::FullScan;
+ readInfo.Type = EPlanTableReadType::FullScan;
auto& ranges = op.Properties["ReadRanges"];
for (const auto& col : tableData.Metadata->KeyColumnNames) {
@@ -1069,7 +1082,7 @@ private:
ranges.AppendValue(rangeDesc);
}
} else if (auto maybeResultBinding = ContainResultBinding(rangesDesc)) {
- readInfo.Type = ETableReadType::Scan;
+ readInfo.Type = EPlanTableReadType::Scan;
auto [txId, resId] = *maybeResultBinding;
if (auto result = GetResult(txId, resId)) {
@@ -1143,7 +1156,7 @@ private:
}
ui32 operatorId;
- if (readInfo.Type == ETableReadType::FullScan) {
+ if (readInfo.Type == EPlanTableReadType::FullScan) {
op.Properties["Name"] = "TableFullScan";
operatorId = AddOperator(planNode, "TableFullScan", std::move(op));
} else {
@@ -1244,9 +1257,9 @@ private:
// Scan which fixes only few first members of compound primary key were called "Lookup"
// by older explain version. We continue to do so.
if (readInfo.LookupBy.size() > 0) {
- readInfo.Type = ETableReadType::Lookup;
+ readInfo.Type = EPlanTableReadType::Lookup;
} else {
- readInfo.Type = hasRangeScans ? ETableReadType::Scan : ETableReadType::FullScan;
+ readInfo.Type = hasRangeScans ? EPlanTableReadType::Scan : EPlanTableReadType::FullScan;
}
}
@@ -1277,13 +1290,13 @@ private:
SerializerCtx.Tables[table].Reads.push_back(readInfo);
ui32 operatorId;
- if (readInfo.Type == ETableReadType::Scan) {
+ if (readInfo.Type == EPlanTableReadType::Scan) {
op.Properties["Name"] = "TableRangeScan";
operatorId = AddOperator(planNode, "TableRangeScan", std::move(op));
- } else if (readInfo.Type == ETableReadType::FullScan) {
+ } else if (readInfo.Type == EPlanTableReadType::FullScan) {
op.Properties["Name"] = "TableFullScan";
operatorId = AddOperator(planNode, "TableFullScan", std::move(op));
- } else if (readInfo.Type == ETableReadType::Lookup) {
+ } else if (readInfo.Type == EPlanTableReadType::Lookup) {
op.Properties["Name"] = "TablePointLookup";
operatorId = AddOperator(planNode, "TablePointLookup", std::move(op));
} else {
diff --git a/ydb/core/kqp/prepare/kqp_query_plan.h b/ydb/core/kqp/prepare/kqp_query_plan.h
index 71af7c99cc9..828cb5ce5c6 100644
--- a/ydb/core/kqp/prepare/kqp_query_plan.h
+++ b/ydb/core/kqp/prepare/kqp_query_plan.h
@@ -12,6 +12,22 @@
namespace NKikimr {
namespace NKqp {
+enum class EPlanTableReadType {
+ Unspecified,
+ FullScan,
+ Scan,
+ Lookup,
+ MultiLookup,
+};
+
+enum class EPlanTableWriteType {
+ Unspecified,
+ Upsert,
+ MultiUpsert,
+ Erase,
+ MultiErase,
+};
+
void WriteKqlPlan(NJsonWriter::TBuf& writer, const NYql::TExprNode::TPtr& query);
/*
diff --git a/ydb/core/kqp/prepare/kqp_query_rewrite.cpp b/ydb/core/kqp/prepare/kqp_query_rewrite.cpp
deleted file mode 100644
index cb24a1b8a1d..00000000000
--- a/ydb/core/kqp/prepare/kqp_query_rewrite.cpp
+++ /dev/null
@@ -1,398 +0,0 @@
-#include "kqp_prepare_impl.h"
-
-#include <ydb/library/yql/core/yql_expr_optimize.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYql;
-using namespace NYql::NNodes;
-
-namespace {
-
-template<typename TMapNode>
-TExprBase RebuildMapToList(TMapNode map, TExprContext& ctx) {
- if (map.Lambda().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List &&
- map.Input().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List)
- {
- return map;
- }
-
- bool isOptional = map.Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional;
-
- if (map.Lambda().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) {
- auto newBody = Build<TCoToList>(ctx, map.Pos())
- .Optional(map.Lambda().Body())
- .Done();
-
- map = Build<TMapNode>(ctx, map.Pos())
- .Input(map.Input())
- .Lambda()
- .Args({"item"})
- .template Body<TExprApplier>()
- .Apply(newBody)
- .With(map.Lambda().Args().Arg(0), "item")
- .Build()
- .Build()
- .Done();
- }
-
- if (map.Input().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional) {
- map = Build<TMapNode>(ctx, map.Pos())
- .template Input<TCoToList>()
- .Optional(map.Input())
- .Build()
- .Lambda(map.Lambda())
- .Done();
- }
-
- if (isOptional) {
- return Build<TCoToOptional>(ctx, map.Pos())
- .List(map)
- .Done();
- }
-
- YQL_CLOG(INFO, ProviderKqp) << "RebuildMapToList";
- return map;
-}
-
-TExprNode::TPtr NormalizeCallables(TExprBase node, TExprContext& ctx, const TKqpAnalyzeResults& analyzeResults) {
- if (!analyzeResults.CallableToExecRootsMap.contains(node.Raw())) {
- return node.Ptr();
- }
-
- if (node.Maybe<TCoMap>() || node.Maybe<TCoFlatMap>()) {
- return node.Maybe<TCoMap>()
- ? RebuildMapToList<TCoMap>(node.Cast<TCoMap>(), ctx).Ptr()
- : RebuildMapToList<TCoFlatMap>(node.Cast<TCoFlatMap>(), ctx).Ptr();
- }
-
- if (auto filter = node.Maybe<TCoFilter>()) {
- YQL_CLOG(INFO, ProviderKqp) << "NormalizeCallables: Filter";
- return Build<TCoFlatMap>(ctx, node.Pos())
- .Input(filter.Cast().Input())
- .Lambda()
- .Args({"item"})
- .Body<TCoIf>()
- .Predicate<TExprApplier>()
- .Apply(filter.Cast().Lambda())
- .With(0, "item")
- .Build()
- .ThenValue<TCoJust>()
- .Input("item")
- .Build()
- .ElseValue<TCoNothing>()
- .OptionalType<TCoOptionalType>()
- .ItemType<TCoTypeOf>()
- .Value("item")
- .Build()
- .Build()
- .Build()
- .Build()
- .Build()
- .Done()
- .Ptr();
- }
-
- return node.Ptr();
-}
-
-TExprNode::TPtr ToListOverToOptional(TExprBase node, TExprContext& ctx) {
- Y_UNUSED(ctx);
-
- if (auto toList = node.Maybe<TCoToList>()) {
- if (auto toOpt = toList.Cast().Optional().Maybe<TCoToOptional>()) {
- YQL_CLOG(INFO, ProviderKqp) << "ToListOverToOptional";
- return toOpt.Cast().List().Ptr();
- }
- if (auto toOpt = toList.Cast().Optional().Maybe<TCoHead>()) {
- YQL_CLOG(INFO, ProviderKqp) << "ToListOverHead";
- return toOpt.Cast().Input().Ptr();
- }
- if (auto toOpt = toList.Cast().Optional().Maybe<TCoLast>()) {
- YQL_CLOG(INFO, ProviderKqp) << "ToListOverLast";
- return toOpt.Cast().Input().Ptr();
- }
- }
-
- return node.Ptr();
-}
-
-template<typename TInner, typename TOuter>
-TExprBase SplitMap(TExprBase input, TCoLambda lambda, TExprContext& ctx, const TVector<TExprBase> execRoots,
- const TKqpAnalyzeResults& analyzeResults)
-{
- auto exprRootsTuple = Build<TExprList>(ctx, lambda.Pos())
- .Add(execRoots)
- .Done();
-
- auto isSameScope = [lambda] (const TNodeInfo* nodeInfo) {
- if (!nodeInfo) {
- return true;
- }
-
- if (!nodeInfo->Scope || nodeInfo->Scope->Lambda.Raw() != lambda.Raw()) {
- return false;
- }
-
- return true;
- };
-
- THashSet<const TExprNode*> innerNodes;
- innerNodes.insert(lambda.Args().Arg(0).Raw());
- VisitExpr(exprRootsTuple.Ptr(),
- [&innerNodes, &isSameScope, &analyzeResults] (const TExprNode::TPtr& node) {
- auto* nodeInfo = analyzeResults.ExprToNodeInfoMap.FindPtr(node.Get());
-
- if (!isSameScope(nodeInfo)) {
- return false;
- }
-
- if (node->IsCallable()) {
- innerNodes.insert(node.Get());
- }
-
- return true;
- });
-
- THashMap<const TExprNode*, TExprBase> jointsMap;
- for (TExprBase root : execRoots) {
- jointsMap.insert(std::make_pair(root.Raw(), root));
- }
-
- VisitExpr(lambda.Body().Ptr(),
- [&innerNodes, &jointsMap, isSameScope, &analyzeResults] (const TExprNode::TPtr& node) {
- auto* nodeInfo = analyzeResults.ExprToNodeInfoMap.FindPtr(node.Get());
- YQL_ENSURE(nodeInfo);
-
- YQL_ENSURE(node->GetTypeAnn());
- bool isComputable = node->IsComputable();
-
- if (innerNodes.contains(node.Get())) {
- if (isComputable) {
- jointsMap.insert(std::make_pair(node.Get(), TExprBase(node)));
- }
-
- return false;
- }
-
- bool hasInnerChild = false;
- if (isComputable && !node->Children().empty() && nodeInfo->IsExecutable) {
- for (const auto& child : node->Children()) {
- if (jointsMap.contains(child.Get())) {
- return true;
- }
-
- bool innerChild = innerNodes.contains(child.Get());
- hasInnerChild = hasInnerChild | innerChild;
-
- YQL_ENSURE(child->GetTypeAnn());
- if (child->GetTypeAnn()->IsComputable() && !innerChild) {
- return true;
- }
- }
-
- if (hasInnerChild) {
- jointsMap.insert(std::make_pair(node.Get(), TExprBase(node)));
- return false;
- }
- }
-
- return true;
- });
-
- TVector<TExprBase> jointNodes;
- jointNodes.reserve(jointsMap.size());
- for (auto& pair : jointsMap) {
- jointNodes.push_back(pair.second);
- }
-
- auto innerLambdaBody = Build<TExprList>(ctx, lambda.Pos())
- .Add(jointNodes)
- .Done();
-
- auto innerLambda = Build<TCoLambda>(ctx, lambda.Pos())
- .Args({"item"})
- .Body<TExprApplier>()
- .Apply(innerLambdaBody)
- .With(lambda.Args().Arg(0), "item")
- .Build()
- .Done();
-
- TCoArgument outerLambdaArg = Build<TCoArgument>(ctx, lambda.Pos())
- .Name("item")
- .Done();
-
- TNodeOnNodeOwnedMap replaceMap;
- for (size_t i = 0; i < jointNodes.size(); ++i) {
- auto node = Build<TCoNth>(ctx, lambda.Pos())
- .Tuple(outerLambdaArg)
- .Index().Build(i)
- .Done();
-
- replaceMap.emplace(jointNodes[i].Raw(), node.Ptr());
- }
-
- auto outerLambdaBody = ctx.ReplaceNodes(lambda.Body().Ptr(), replaceMap);
- auto outerLambda = Build<TCoLambda>(ctx, lambda.Pos())
- .Args({outerLambdaArg})
- .Body(TExprBase(outerLambdaBody))
- .Done();
-
- return Build<TOuter>(ctx, input.Pos())
- .template Input<TInner>()
- .Input(input)
- .Lambda(innerLambda)
- .Build()
- .Lambda(outerLambda)
- .Done();
-}
-
-TExprNode::TPtr SplitMap(TExprBase mapNode, TExprContext& ctx, const TVector<TExprBase>& execRoots,
- const TKqpAnalyzeResults& analyzeResults)
-{
- YQL_ENSURE(mapNode.Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List);
-
- if (auto map = mapNode.Maybe<TCoMap>()) {
- YQL_CLOG(INFO, ProviderKqp) << "SplitMap: Map, MapParameter";
- return SplitMap<TCoMap, TKiMapParameter>(map.Cast().Input(), map.Cast().Lambda(), ctx, execRoots,
- analyzeResults).Ptr();
- }
-
- if (auto map = mapNode.Maybe<TCoFlatMap>()) {
- YQL_CLOG(INFO, ProviderKqp) << "SplitMap: Map, FlatMapParameter";
- YQL_ENSURE(map.Cast().Lambda().Ptr()->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List);
- return SplitMap<TCoMap, TKiFlatMapParameter>(map.Cast().Input(), map.Cast().Lambda(), ctx, execRoots,
- analyzeResults).Ptr();
- }
-
- if (auto map = mapNode.Maybe<TKiMapParameter>()) {
- YQL_CLOG(INFO, ProviderKqp) << "SplitMap: MapParameter, MapParameter";
- return SplitMap<TKiMapParameter, TKiMapParameter>(map.Cast().Input(), map.Cast().Lambda(),
- ctx, execRoots, analyzeResults).Ptr();
- }
-
- if (auto map = mapNode.Maybe<TKiFlatMapParameter>()) {
- YQL_CLOG(INFO, ProviderKqp) << "SplitMap: MapParameter, FlatMapParameter";
- return SplitMap<TKiMapParameter, TKiFlatMapParameter>(map.Cast().Input(), map.Cast().Lambda(),
- ctx, execRoots, analyzeResults).Ptr();
- }
-
- YQL_ENSURE(false);
- return nullptr;
-}
-
-TExprNode::TPtr UnnestExecutionRoots(TExprBase node, TExprContext& ctx, const TKqpAnalyzeResults& analyzeResults) {
- auto execRoots = analyzeResults.CallableToExecRootsMap.FindPtr(node.Raw());
-
- if (!execRoots) {
- return node.Ptr();
- }
-
- if (auto maybeMap = node.Maybe<TCoMap>()) {
- auto map = maybeMap.Cast();
- if (map.Input().Maybe<TCoParameter>()) {
- YQL_CLOG(INFO, ProviderKqp) << "UnnestExecutionRoots: Map parameter";
- return Build<TKiMapParameter>(ctx, node.Pos())
- .Input(map.Input())
- .Lambda(map.Lambda())
- .Done()
- .Ptr();
- }
- }
-
- if (auto maybeFlatMap = node.Maybe<TCoFlatMap>()) {
- auto flatMap = maybeFlatMap.Cast();
- if (flatMap.Input().Maybe<TCoParameter>()) {
- YQL_CLOG(INFO, ProviderKqp) << "UnnestExecutionRoots: FlatMap parameter";
- return Build<TKiFlatMapParameter>(ctx, node.Pos())
- .Input(flatMap.Input())
- .Lambda(flatMap.Lambda())
- .Done()
- .Ptr();
- }
- }
-
- if (node.Maybe<TCoMap>() ||
- node.Maybe<TCoFlatMap>() ||
- node.Maybe<TKiMapParameter>() ||
- node.Maybe<TKiFlatMapParameter>())
- {
- return SplitMap(node, ctx, *execRoots, analyzeResults);
- }
-
- ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
- << "Can't rewrite callable for KQP execution: " << node.Ptr()->Content()));
- YQL_ENSURE(false);
- return nullptr;
-}
-
-class TKqpRewriteTransformer : public TSyncTransformerBase {
-public:
- TKqpRewriteTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx)
- : TransformCtx(transformCtx) {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- const auto& analyzeResults = TransformCtx->AnalyzeResults;
-
- if (analyzeResults.CallableToExecRootsMap.empty()) {
- return TStatus::Ok;
- }
-
- TOptimizeExprSettings optSettings(nullptr);
- optSettings.VisitChanges = true;
- TStatus status = TStatus::Error;
-
- status = OptimizeExpr(input, output,
- [&analyzeResults](const TExprNode::TPtr& input, TExprContext& ctx) {
- auto ret = input;
- TExprBase node(input);
-
- ret = NormalizeCallables(node, ctx, analyzeResults);
- if (ret != input) {
- return ret;
- }
-
- ret = ToListOverToOptional(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- return ret;
- }, ctx, optSettings);
- YQL_ENSURE(status == TStatus::Ok);
-
- if (input != output) {
- return TStatus(TStatus::Repeat, true);
- }
-
- status = OptimizeExpr(input, output,
- [&analyzeResults](const TExprNode::TPtr& input, TExprContext& ctx) {
- auto ret = input;
- TExprBase node(input);
-
- ret = UnnestExecutionRoots(node, ctx, analyzeResults);
- if (ret != input) {
- return ret;
- }
-
- return ret;
- }, ctx, optSettings);
- YQL_ENSURE(status == TStatus::Ok);
-
- YQL_ENSURE(input != output);
- return TStatus(TStatus::Repeat, true);
- }
-
-private:
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
-};
-
-} // namespace
-
-TAutoPtr<IGraphTransformer> CreateKqpRewriteTransformer(TIntrusivePtr<TKqlTransformContext> transformCtx) {
- return new TKqpRewriteTransformer(transformCtx);
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_query_simplify.cpp b/ydb/core/kqp/prepare/kqp_query_simplify.cpp
deleted file mode 100644
index d8fefeb9393..00000000000
--- a/ydb/core/kqp/prepare/kqp_query_simplify.cpp
+++ /dev/null
@@ -1,372 +0,0 @@
-#include "kqp_prepare_impl.h"
-
-#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
-
-#include <ydb/library/yql/core/yql_expr_optimize.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYql;
-using namespace NYql::NNodes;
-
-namespace {
-
-template<typename TMapNode>
-TExprBase BuildMap(TExprBase input, TExprBase body, TCoArgument arg, TExprContext& ctx) {
- return Build<TMapNode>(ctx, input.Pos())
- .Input(input)
- .Lambda()
- .Args({"item"})
- .template Body<TExprApplier>()
- .Apply(body)
- .With(arg, "item")
- .Build()
- .Build()
- .Done();
-}
-
-TExprNode::TPtr ExtractFilter(TExprBase node, TExprContext& ctx) {
- if (!node.Maybe<TCoFlatMap>()) {
- return node.Ptr();
- }
-
- auto flatmap = node.Cast<TCoFlatMap>();
- auto flatmapArg = flatmap.Lambda().Args().Arg(0);
-
- if (flatmap.Input().Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::List) {
- return node.Ptr();
- }
-
- if (auto maybeConditional = flatmap.Lambda().Body().Maybe<TCoConditionalValueBase>()) {
- auto conditional = maybeConditional.Cast();
-
- auto blacklistedNode = FindNode(conditional.Predicate().Ptr(), [](const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
-
- if (node.Maybe<TKiSelectRow>() || node.Maybe<TKiSelectRangeBase>()) {
- return true;
- }
-
- return false;
- });
-
- if (blacklistedNode) {
- return node.Ptr();
- }
-
- auto filter = Build<TCoFilter>(ctx, node.Pos())
- .Input(flatmap.Input())
- .Lambda()
- .Args({"item"})
- .Body<TExprApplier>()
- .Apply(conditional.Predicate())
- .With(flatmapArg, "item")
- .Build()
- .Build()
- .Done();
-
- auto value = conditional.Value();
-
- if (conditional.Maybe<TCoListIf>() || conditional.Maybe<TCoOptionalIf>()) {
- return BuildMap<TCoMap>(filter, value, flatmapArg, ctx).Ptr();
- } else {
- return BuildMap<TCoFlatMap>(filter, value, flatmapArg, ctx).Ptr();
- }
- }
-
- if (auto maybeConditional = flatmap.Lambda().Body().Maybe<TCoIf>()) {
- auto conditional = maybeConditional.Cast();
-
- auto elseValue = conditional.ElseValue();
- if (!elseValue.Maybe<TCoList>() && !elseValue.Maybe<TCoNothing>()) {
- return node.Ptr();
- }
-
- auto blacklistedNode = FindNode(conditional.Predicate().Ptr(), [](const TExprNode::TPtr& exprNode) {
- auto node = TExprBase(exprNode);
-
- if (node.Maybe<TKiSelectRow>() || node.Maybe<TKiSelectRangeBase>()) {
- return true;
- }
-
- return false;
- });
-
- if (blacklistedNode) {
- return node.Ptr();
- }
-
- auto filter = Build<TCoFilter>(ctx, node.Pos())
- .Input(flatmap.Input())
- .Lambda()
- .Args({"item"})
- .Body<TExprApplier>()
- .Apply(conditional.Predicate())
- .With(flatmapArg, "item")
- .Build()
- .Build()
- .Done();
-
- auto value = conditional.ThenValue();
-
- return BuildMap<TCoFlatMap>(filter, value, flatmapArg, ctx).Ptr();
- }
-
- return node.Ptr();
-}
-
-TExprNode::TPtr ExtractCombineByKeyPreMap(TExprBase node, TExprContext& ctx) {
- if (auto maybeCombine = node.Maybe<TCoCombineByKey>()) {
- auto combine = maybeCombine.Cast();
- if (!IsKqlPureLambda(combine.PreMapLambda())) {
- return Build<TCoCombineByKey>(ctx, node.Pos())
- .Input<TCoMap>()
- .Input(combine.Input())
- .Lambda(combine.PreMapLambda())
- .Build()
- .PreMapLambda()
- .Args({"item"})
- .Body("item")
- .Build()
- .KeySelectorLambda(combine.KeySelectorLambda())
- .InitHandlerLambda(combine.InitHandlerLambda())
- .UpdateHandlerLambda(combine.UpdateHandlerLambda())
- .FinishHandlerLambda(combine.FinishHandlerLambda())
- .Done()
- .Ptr();
- }
- }
-
- return node.Ptr();
-}
-
-template <class TPartitionsType>
-TExprNode::TPtr ExtractPartitionByKeyListHandler(TExprBase node, TExprContext& ctx) {
- if (auto maybePartition = node.Maybe<TPartitionsType>()) {
- auto partition = maybePartition.Cast();
- if (!IsKqlPureLambda(partition.ListHandlerLambda())) {
- auto newPartition = Build<TPartitionsType>(ctx, node.Pos())
- .Input(partition.Input())
- .KeySelectorLambda(partition.KeySelectorLambda())
- .SortDirections(partition.SortDirections())
- .SortKeySelectorLambda(partition.SortKeySelectorLambda())
- .ListHandlerLambda()
- .Args({"list"})
- .Body("list")
- .Build()
- .Done();
-
- return Build<TExprApplier>(ctx, node.Pos())
- .Apply(partition.ListHandlerLambda().Body())
- .With(partition.ListHandlerLambda().Args().Arg(0), newPartition)
- .Done()
- .Ptr();
- }
- }
-
- return node.Ptr();
-}
-
-template<typename TMapType>
-TExprNode::TPtr MergeMapsWithSameLambda(TExprBase node, TExprContext& ctx) {
- if (!node.Maybe<TCoExtend>()) {
- return node.Ptr();
- }
-
- bool hasInputsToMerge = false;
- TVector<std::pair<TMaybeNode<TCoLambda>, TVector<TExprBase>>> inputs;
-
- auto extend = node.Cast<TCoExtend>();
- for (const auto& list : extend) {
- TMaybeNode<TExprBase> input;
- TMaybeNode<TCoLambda> lambda;
-
- bool buildList = false;
- if (auto maybeMap = list.Maybe<TMapType>()) {
- lambda = maybeMap.Cast().Lambda();
- input = maybeMap.Cast().Input();
-
- YQL_ENSURE(input.Ref().GetTypeAnn());
- buildList = input.Cast().Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Optional;
- } else if (auto maybeMap = list.Maybe<TCoToList>().Optional().Maybe<TMapType>()) {
- lambda = maybeMap.Cast().Lambda();
- input = maybeMap.Cast().Input();
- buildList = true;
- }
-
- if (buildList) {
- input = Build<TCoToList>(ctx, node.Pos())
- .Optional(input.Cast())
- .Done();
- }
-
- if (lambda && !IsKqlPureLambda(lambda.Cast())) {
- if (!inputs.empty() && inputs.back().first && inputs.back().first.Cast().Raw() == lambda.Cast().Raw()) {
- inputs.back().second.push_back(input.Cast());
- hasInputsToMerge = true;
- } else {
- inputs.emplace_back(lambda, TVector<TExprBase>{input.Cast()});
- }
- } else {
- inputs.emplace_back(TMaybeNode<TCoLambda>(), TVector<TExprBase>{list});
- }
- }
-
- if (!hasInputsToMerge) {
- return node.Ptr();
- }
-
- TVector<TExprBase> merged;
- for (const auto& pair : inputs) {
- const auto& lambda = pair.first;
- const auto& lists = pair.second;
-
- if (lambda) {
- YQL_ENSURE(!lists.empty());
- auto mergedInput = lists.front();
-
- if (lists.size() > 1) {
- mergedInput = Build<TCoExtend>(ctx, node.Pos())
- .Add(lists)
- .Done();
- }
-
- auto mergedMap = Build<TMapType>(ctx, node.Pos())
- .Input(mergedInput)
- .Lambda()
- .Args({"item"})
- .template Body<TExprApplier>()
- .Apply(lambda.Cast())
- .With(0, "item")
- .Build()
- .Build()
- .Done();
-
- merged.push_back(mergedMap);
- } else {
- YQL_ENSURE(lists.size() == 1);
- merged.push_back(lists.front());
- }
- }
-
- YQL_ENSURE(!merged.empty());
- auto ret = merged.front();
- if (merged.size() > 1) {
- ret = Build<TCoExtend>(ctx, node.Pos())
- .Add(merged)
- .Done();
- }
-
- return ret.Ptr();
-}
-
-TExprNode::TPtr RewritePresentIfToFlatMap(TExprBase node, TExprContext& ctx) {
- if (!node.Maybe<TCoIfPresent>()) {
- return node.Ptr();
- }
- auto ifPresent = node.Cast<TCoIfPresent>();
-
- if (IsKqlPureLambda(ifPresent.PresentHandler())) {
- return node.Ptr();
- }
-
- if (!ifPresent.MissingValue().Maybe<TCoNothing>()) {
- return node.Ptr();
- }
-
- return Build<TCoFlatMap>(ctx, node.Pos())
- .Input(ifPresent.Optional())
- .Lambda()
- .Args({"item"})
- .Body<TExprApplier>()
- .Apply(ifPresent.PresentHandler())
- .With(ifPresent.PresentHandler().Args().Arg(0), "item")
- .Build()
- .Build()
- .Done()
- .Ptr();
-}
-
-class TKqpSimplifyTransformer : public TSyncTransformerBase {
-public:
- TKqpSimplifyTransformer()
- : Simplified(false) {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- if (Simplified) {
- return TStatus::Ok;
- }
-
- TOptimizeExprSettings optSettings(nullptr);
- optSettings.VisitChanges = false;
- TStatus status = OptimizeExpr(input, output,
- [](const TExprNode::TPtr& input, TExprContext& ctx) {
- auto ret = input;
- TExprBase node(input);
-
- ret = MergeMapsWithSameLambda<TCoMap>(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- ret = MergeMapsWithSameLambda<TCoFlatMap>(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- ret = ExtractFilter(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- ret = ExtractCombineByKeyPreMap(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- ret = ExtractPartitionByKeyListHandler<TCoPartitionByKey>(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- ret = ExtractPartitionByKeyListHandler<TCoPartitionsByKeys>(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- ret = RewritePresentIfToFlatMap(node, ctx);
- if (ret != input) {
- return ret;
- }
-
- return ret;
- }, ctx, optSettings);
-
- if (input != output) {
- return TStatus(TStatus::Repeat, true);
- }
-
- if (status == TStatus::Ok) {
- Simplified = true;
- }
-
- return status;
- }
-
- void Rewind() override {
- Simplified = false;
- }
-
-private:
- bool Simplified;
-};
-
-} // namespace
-
-TAutoPtr<IGraphTransformer> CreateKqpSimplifyTransformer() {
- return new TKqpSimplifyTransformer();
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_query_substitute.cpp b/ydb/core/kqp/prepare/kqp_query_substitute.cpp
deleted file mode 100644
index 6d289bf78b9..00000000000
--- a/ydb/core/kqp/prepare/kqp_query_substitute.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-#include "kqp_prepare_impl.h"
-
-#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
-
-#include <ydb/library/yql/core/yql_expr_optimize.h>
-
-namespace NKikimr {
-namespace NKqp {
-
-using namespace NYql;
-using namespace NYql::NNodes;
-
-namespace {
-
-class TKqpSubstituteTransformer : public TSyncTransformerBase {
-public:
- TKqpSubstituteTransformer(TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx)
- : TxState(txState)
- , TransformCtx(transformCtx) {}
-
- TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- const auto& analyzeResults = TransformCtx->AnalyzeResults;
-
- if (analyzeResults.CanExecute) {
- return TStatus::Ok;
- }
-
- TNodeOnNodeOwnedMap replaceMap;
- for (size_t i = 0; i < analyzeResults.ExecutionRoots.size(); ++i) {
- auto newParamName = TxState->Tx().NewParamName();
-
- auto node = analyzeResults.ExecutionRoots[i].Node;
-
- YQL_ENSURE(node.Ref().GetTypeAnn());
- auto paramNode = Build<TCoParameter>(ctx, node.Pos())
- .Name().Build(newParamName)
- .Type(ExpandType(node.Pos(), *node.Ref().GetTypeAnn(), ctx))
- .Done();
-
- YQL_ENSURE(!TransformCtx->MkqlResults.empty());
- ui32 mkqlIndex = TransformCtx->MkqlResults.size() - 1;
- ui32 resultIndex = i;
- auto indexTuple = Build<TCoAtomList>(ctx, paramNode.Pos())
- .Add().Build(ToString(mkqlIndex))
- .Add().Build(ToString(resultIndex))
- .Done();
-
- paramNode.Ptr()->SetResult(indexTuple.Ptr());
-
- replaceMap.emplace(node.Raw(), paramNode.Ptr());
- }
-
- output = ctx.ReplaceNodes(std::move(input), replaceMap);
-
- return TStatus(TStatus::Repeat, true);
- }
-
-private:
- TIntrusivePtr<TKqpTransactionState> TxState;
- TIntrusivePtr<TKqlTransformContext> TransformCtx;
-};
-
-} // namespace
-
-TAutoPtr<IGraphTransformer> CreateKqpSubstituteTransformer(TIntrusivePtr<TKqpTransactionState> txState,
- TIntrusivePtr<TKqlTransformContext> transformCtx)
-{
- return new TKqpSubstituteTransformer(txState, transformCtx);
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp
index 90ae5876243..9f12a89f54a 100644
--- a/ydb/core/kqp/prepare/kqp_type_ann.cpp
+++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp
@@ -1,4 +1,4 @@
-#include "kqp_prepare_impl.h"
+#include "kqp_prepare.h"
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider_impl.h>
@@ -1346,29 +1346,5 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckQueryTransformer() {
});
}
-TAutoPtr<IGraphTransformer> CreateKqpCheckKiProgramTransformer() {
- return CreateFunctorTransformer(
- [](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
- output = input;
-
- YQL_ENSURE(TMaybeNode<TKiProgram>(input));
-
- auto program = TKiProgram(input);
- auto effectsType = program.Effects().Ptr()->GetTypeAnn();
- bool typeOk = EnsureListType(input->Pos(), *effectsType, ctx);
- if (typeOk) {
- auto listType = effectsType->Cast<TListExprType>();
- typeOk = listType->GetItemType()->GetKind() == ETypeAnnotationKind::Void;
- }
- if (!typeOk) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
- << "Invalid program effects type: " << FormatType(effectsType)));
- return TStatus::Error;
- }
-
- return TStatus::Ok;
- });
-}
-
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 30eb9642409..a29c1d11c87 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -74,7 +74,7 @@ TIntrusivePtr<IKqpGateway> GetIcGateway(Tests::TServer& server) {
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(), false);
return CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
- server.GetRuntime()->GetNodeId(0), counters, MakeMiniKQLCompileServiceID());
+ server.GetRuntime()->GetNodeId(0), counters);
}
void TestListPathCommon(TIntrusivePtr<IKikimrGateway> gateway) {
diff --git a/ydb/core/kqp/ut/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/kqp_indexes_ut.cpp
index 9a52865690e..d4a26fc98b7 100644
--- a/ydb/core/kqp/ut/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_indexes_ut.cpp
@@ -35,7 +35,7 @@ TIntrusivePtr<NKqp::IKqpGateway> GetIcGateway(Tests::TServer& server) {
counters->TxProxyMon = new NTxProxy::TTxProxyMon(server.GetRuntime()->GetAppData(0).Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(server.GetRuntime()->GetAnyNodeActorSystem(), false);
return NKqp::CreateKikimrIcGateway(TestCluster, "/Root", std::move(loader), server.GetRuntime()->GetAnyNodeActorSystem(),
- server.GetRuntime()->GetNodeId(0), counters, MakeMiniKQLCompileServiceID());
+ server.GetRuntime()->GetNodeId(0), counters);
}
TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> gateway,