diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-11-18 18:38:39 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-11-18 18:38:39 +0300 |
commit | ea58f4f34a9a7a66d041931c3f91003f41f6d83c (patch) | |
tree | 5dda86b2a35fbea7ae6c16aed8189f02f5d20757 | |
parent | 20ba7932108cc6305f809d0011ea19b3fdb20d53 (diff) | |
download | ydb-ea58f4f34a9a7a66d041931c3f91003f41f6d83c.tar.gz |
support uncommitted changes in KQP
29 files changed, 782 insertions, 286 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp index e12ceca2c77..09f3ed5a430 100644 --- a/ydb/core/kqp/compile/kqp_compile.cpp +++ b/ydb/core/kqp/compile/kqp_compile.cpp @@ -355,7 +355,7 @@ public: Alloc.Acquire(); } - bool CompilePhysicalQuery(const TKqpPhysicalQuery& query, const TKiOperationList& tableOps, + bool CompilePhysicalQuery(const TKqpPhysicalQuery& query, const TKiDataQuery& dataQuery, NKqpProto::TKqpPhyQuery& queryProto, TExprContext& ctx) final { TGuard<TScopedAlloc> allocGuard(Alloc); @@ -364,7 +364,15 @@ public: YQL_ENSURE(querySettings.Type); queryProto.SetType(GetPhyQueryType(*querySettings.Type)); - auto ops = TableOperationsToProto(tableOps, ctx); + for (const auto& queryBlock : dataQuery.Blocks()) { + auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock); + if (queryBlockSettings.HasUncommittedChangesRead) { + queryProto.SetHasUncommittedChangesRead(true); + break; + } + } + + auto ops = TableOperationsToProto(dataQuery.Operations(), ctx); for (auto& op : ops) { const auto tableName = op.GetTable(); auto operation = static_cast<TYdbOperation>(op.GetOperation()); diff --git a/ydb/core/kqp/compile/kqp_compile.h b/ydb/core/kqp/compile/kqp_compile.h index 774bb3cc533..f9ed22a3b12 100644 --- a/ydb/core/kqp/compile/kqp_compile.h +++ b/ydb/core/kqp/compile/kqp_compile.h @@ -12,7 +12,7 @@ namespace NKqp { class IKqpQueryCompiler : public TThrRefBase { public: virtual bool CompilePhysicalQuery(const NYql::NNodes::TKqpPhysicalQuery& query, - const NYql::NNodes::TKiOperationList& tableOps, NKqpProto::TKqpPhyQuery& queryProto, + const NYql::NNodes::TKiDataQuery& dataQuery, NKqpProto::TKqpPhyQuery& queryProto, NYql::TExprContext& ctx) = 0; }; diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index 40aefd37de6..bc8c4f62777 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1271,7 +1271,7 @@ private: << ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString()); TEvDataShard::TEvProposeTransaction* ev; - if (Snapshot.IsValid() && ReadOnlyTx) { + if (Snapshot.IsValid() && (ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects())) { ev = new TEvDataShard::TEvProposeTransaction( NKikimrTxDataShard::TX_KIND_DATA, SelfId(), @@ -1574,7 +1574,7 @@ private: break; } - if (ReadOnlyTx && Request.Snapshot.IsValid()) { + if ((ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects()) && Request.Snapshot.IsValid()) { // Snapshot reads are always immediate Snapshot = Request.Snapshot; ImmediateTx = true; diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 8b323d48b71..7b084488c1b 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -321,7 +321,7 @@ "ListBase": "TKqlQueryResult" }, { - "Name": "TKqlQuery", + "Name": "TKqlQueryBlock", "Base": "TExprBase", "Match": {"Type": "Tuple"}, "Children": [ @@ -330,6 +330,18 @@ ] }, { + "Name": "TKqlQueryBlockList", + "ListBase": "TKqlQueryBlock" + }, + { + "Name": "TKqlQuery", + "Base": "TExprBase", + "Match": {"Type": "Tuple"}, + "Children": [ + {"Index": 0, "Name": "Blocks", "Type": "TKqlQueryBlockList"} + ] + }, + { "Name": "TKqpEffects", "VarArgBase": "TExprBase", "Match": {"Type": "Callable", "Name": "KqpEffects"} diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 12262825433..6865597f335 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -722,8 +722,9 @@ public: case EKikimrQueryType::YqlScript: if (useScanQuery) { ui64 rowsLimit = 0; - if (!dataQuery.Results().Empty()) { - rowsLimit = FromString<ui64>(dataQuery.Results().Item(0).RowsLimit()); + if (!dataQuery.Blocks().Empty() && !dataQuery.Blocks().Item(0).Results().Empty()) { + const auto& queryBlock = dataQuery.Blocks().Item(0); + rowsLimit = FromString<ui64>(queryBlock.Results().Item(0).RowsLimit()); } if (SessionCtx->Query().PrepareOnly) { @@ -800,12 +801,19 @@ private: return *settings.UseScanQuery; } - if (query.Effects().ArgCount() > 0) { + if (query.Blocks().Size() != 1) { + // Don't use ScanQuery for muiltiple blocks query + return false; + } + + const auto& queryBlock = query.Blocks().Item(0); + + if (queryBlock.Effects().ArgCount() > 0) { // Do not use ScanQuery for queries with effects. return false; } - if (query.Results().Size() != 1) { + if (queryBlock.Results().Size() != 1) { // Do not use ScanQuery for queries with multiple result sets. return false; } @@ -829,7 +837,7 @@ private: bool hasIndexReads = false; bool hasJoins = false; - VisitExpr(query.Results().Ptr(), [&hasIndexReads, &hasJoins] (const TExprNode::TPtr& exprNode) { + VisitExpr(queryBlock.Results().Ptr(), [&hasIndexReads, &hasJoins] (const TExprNode::TPtr& exprNode) { auto node = TExprBase(exprNode); if (auto read = node.Maybe<TKiReadTable>()) { diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index e71a4fb8408..6dc2b053a0e 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -142,12 +142,19 @@ public: TKiDataQuery dataQuery(query); - if (dataQuery.Results().Size() != 1) { + if (dataQuery.Blocks().Size() != 1) { + ctx.AddError(YqlIssue(ctx.GetPosition(dataQuery.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, + "Scan query should have single query block.")); + return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); + } + + const auto& queryBlock = dataQuery.Blocks().Item(0); + if (queryBlock.Results().Size() != 1) { ctx.AddError(YqlIssue(ctx.GetPosition(dataQuery.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Scan query should have a single result set.")); return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); } - if (dataQuery.Effects().ArgCount() > 0) { + if (queryBlock.Effects().ArgCount() > 0) { ctx.AddError(YqlIssue(ctx.GetPosition(dataQuery.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED, "Scan query cannot have data modifications.")); return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); @@ -166,15 +173,17 @@ private: auto* queryCtx = TransformCtx->QueryCtx.Get(); if (queryCtx->Type == EKikimrQueryType::Dml) { - ui32 resultsCount = dataQuery.Results().Size(); - for (ui32 i = 0; i < resultsCount; ++i) { - auto& result = *queryCtx->PreparingQuery->AddResults(); - result.SetKqlIndex(0); - result.SetResultIndex(i); - for (const auto& column : dataQuery.Results().Item(i).Columns()) { - *result.AddColumnHints() = column.Value(); + ui32 resultsCount = 0; + for (const auto& block : dataQuery.Blocks()) { + for (ui32 i = 0; i < block.Results().Size(); ++i, ++resultsCount) { + auto& result = *queryCtx->PreparingQuery->AddResults(); + result.SetKqlIndex(0); + result.SetResultIndex(resultsCount); + for (const auto& column : block.Results().Item(i).Columns()) { + *result.AddColumnHints() = column.Value(); + } + result.SetRowsLimit(FromString<ui64>(block.Results().Item(i).RowsLimit())); } - result.SetRowsLimit(FromString<ui64>(dataQuery.Results().Item(i).RowsLimit())); } } else { // scan query @@ -251,8 +260,7 @@ private: auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery; TKqpPhysicalQuery physicalQuery(transformedQuery); auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry); - auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQuery.Operations(), - *preparedQuery.MutablePhysicalQuery(), ctx); + auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQuery, *preparedQuery.MutablePhysicalQuery(), ctx); if (!ret) { ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to compile physical query.")); return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues())); diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index e32e895883f..6033b54279a 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1363,12 +1363,14 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckQueryTransformer() { YQL_ENSURE(TMaybeNode<TKqlQuery>(input)); auto query = TKqlQuery(input); - for (const auto& result : query.Results()) { - if (!EnsureTupleSize(result.Ref(), 2, ctx)) { - return TStatus::Error; - } - if (!EnsureListType(result.Value().Ref(), ctx)) { - return TStatus::Error; + for (const auto& block : query.Blocks()) { + for (const auto& result : block.Results()) { + if (!EnsureTupleSize(result.Ref(), 2, ctx)) { + return TStatus::Error; + } + if (!EnsureListType(result.Value().Ref(), ctx)) { + return TStatus::Error; + } } } diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index b40b1e7b5be..67681c51db6 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -473,39 +473,41 @@ public: } TVector<TExprBase> queryResults; - if (!query.Results().Empty()) { - auto tx = BuildTx(query.Results().Ptr(), ctx, false); - if (!tx) { - return TStatus::Error; - } + for (const auto& block : query.Blocks()) { + if (!block.Results().Empty()) { + auto tx = BuildTx(block.Results().Ptr(), ctx, false); + if (!tx) { + return TStatus::Error; + } - BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); + BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); - for (ui32 i = 0; i < query.Results().Size(); ++i) { - const auto& result = query.Results().Item(i); - auto binding = Build<TKqpTxResultBinding>(ctx, query.Pos()) - .Type(ExpandType(query.Pos(), *result.Value().Ref().GetTypeAnn(), ctx)) - .TxIndex() - .Build(ToString(BuildCtx->PhysicalTxs.size() - 1)) - .ResultIndex() - .Build(ToString(i)) - .Done(); + for (ui32 i = 0; i < block.Results().Size(); ++i) { + const auto& result = block.Results().Item(i); + auto binding = Build<TKqpTxResultBinding>(ctx, block.Pos()) + .Type(ExpandType(block.Pos(), *result.Value().Ref().GetTypeAnn(), ctx)) + .TxIndex() + .Build(ToString(BuildCtx->PhysicalTxs.size() - 1)) + .ResultIndex() + .Build(ToString(i)) + .Done(); - queryResults.emplace_back(std::move(binding)); + queryResults.emplace_back(std::move(binding)); + } } - } - if (!query.Effects().Empty()) { - auto tx = BuildTx(query.Effects().Ptr(), ctx, /* isPrecompute */ false); - if (!tx) { - return TStatus::Error; - } + if (!block.Effects().Empty()) { + auto tx = BuildTx(block.Effects().Ptr(), ctx, /* isPrecompute */ false); + if (!tx) { + return TStatus::Error; + } - if (!CheckEffectsTx(tx.Cast(), ctx)) { - return TStatus::Error; - } + if (!CheckEffectsTx(tx.Cast(), ctx)) { + return TStatus::Error; + } - BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); + BuildCtx->PhysicalTxs.emplace_back(tx.Cast()); + } } TKqpPhyQuerySettings querySettings; diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index 48fe4039bf6..27ba20e8ea4 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -415,59 +415,70 @@ template <bool GroupEffectsByTable> TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { - TVector<TExprBase> builtEffects; + TVector<TKqlQueryBlock> queryBlocks; + queryBlocks.reserve(query.Blocks().Size()); - if constexpr (GroupEffectsByTable) { - TMap<TStringBuf, TVector<TKqlTableEffect>> tableEffectsMap; - for (const auto& maybeEffect: query.Effects()) { - if (const auto maybeList = maybeEffect.Maybe<TExprList>()) { - for (const auto effect : maybeList.Cast()) { - YQL_ENSURE(effect.Maybe<TKqlTableEffect>()); - auto tableEffect = effect.Cast<TKqlTableEffect>(); + for (const auto& block : query.Blocks()) { + TVector<TExprBase> builtEffects; + + if constexpr(GroupEffectsByTable) { + TMap<TStringBuf, TVector<TKqlTableEffect>> tableEffectsMap; + for (const auto& maybeEffect : block.Effects()) { + if (const auto maybeList = maybeEffect.Maybe<TExprList>()) { + for (const auto effect : maybeList.Cast()) { + YQL_ENSURE(effect.Maybe<TKqlTableEffect>()); + auto tableEffect = effect.Cast<TKqlTableEffect>(); + + tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect); + } + } else { + YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>()); + auto tableEffect = maybeEffect.Cast<TKqlTableEffect>(); tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect); } - } else { - YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>()); - auto tableEffect = maybeEffect.Cast<TKqlTableEffect>(); - - tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect); } - } - for (const auto& pair: tableEffectsMap) { - if (!BuildEffects(query.Pos(), pair.second, ctx, kqpCtx, builtEffects)) { - return {}; + for (const auto& pair: tableEffectsMap) { + if (!BuildEffects(block.Pos(), pair.second, ctx, kqpCtx, builtEffects)) { + return {}; + } } - } - } else { - builtEffects.reserve(query.Effects().Size() * 2); - - for (const auto& maybeEffect : query.Effects()) { - if (const auto maybeList = maybeEffect.Maybe<TExprList>()) { - for (const auto effect : maybeList.Cast()) { - YQL_ENSURE(effect.Maybe<TKqlTableEffect>()); - auto tableEffect = effect.Cast<TKqlTableEffect>(); + } else { + builtEffects.reserve(block.Effects().Size() * 2); + + for (const auto &maybeEffect : block.Effects()) { + if (const auto maybeList = maybeEffect.Maybe<TExprList>()) { + for (const auto effect : maybeList.Cast()) { + YQL_ENSURE(effect.Maybe<TKqlTableEffect>()); + auto tableEffect = effect.Cast<TKqlTableEffect>(); + + if (!BuildEffects(block.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) { + return {}; + } + } + } else { + YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>()); + auto tableEffect = maybeEffect.Cast<TKqlTableEffect>(); - if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) { + if (!BuildEffects(block.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) { return {}; } } - } else { - YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>()); - auto tableEffect = maybeEffect.Cast<TKqlTableEffect>(); - - if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) { - return {}; - } } } + + queryBlocks.emplace_back(Build<TKqlQueryBlock>(ctx, block.Pos()) + .Results(block.Results()) + .Effects() + .Add(builtEffects) + .Build() + .Done()); } return Build<TKqlQuery>(ctx, query.Pos()) - .Results(query.Results()) - .Effects() - .Add(builtEffects) + .Blocks() + .Add(queryBlocks) .Build() .Done(); } @@ -487,11 +498,13 @@ TAutoPtr<IGraphTransformer> CreateKqpQueryEffectsTransformer(const TIntrusivePtr bool requireBuild = false; bool hasBuilt = false; - for (const auto& effect : query.Effects()) { - if (!effect.Maybe<TDqOutput>()) { - requireBuild = true; - } else { - hasBuilt = true; + for (const auto& block : query.Blocks()) { + for (const auto& effect : block.Effects()) { + if (!effect.Maybe<TDqOutput>()) { + requireBuild = true; + } else { + hasBuilt = true; + } } } diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index 93c7bea5af6..0fed2b1100b 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -748,40 +748,51 @@ TIntrusivePtr<TKikimrTableMetadata> GetIndexMetadata(const TKqlReadTableIndex& r TMaybe<TKqlQuery> BuildKqlQuery(TKiDataQuery query, const TKikimrTablesData& tablesData, TExprContext& ctx, bool withSystemColumns, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx) { - TVector<TExprBase> kqlEffects; - for (const auto& effect : query.Effects()) { - if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) { - auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData); - kqlEffects.push_back(result); - } + TVector<TKqlQueryBlock> queryBlocks; + queryBlocks.reserve(query.Blocks().Size()); + + for (const auto& block : query.Blocks()) { + TVector <TExprBase> kqlEffects; + for (const auto& effect : block.Effects()) { + if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) { + auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData); + kqlEffects.push_back(result); + } - if (auto maybeUpdate = effect.Maybe<TKiUpdateTable>()) { - auto results = HandleUpdateTable(maybeUpdate.Cast(), ctx, tablesData, withSystemColumns, kqpCtx); - kqlEffects.insert(kqlEffects.end(), results.begin(), results.end()); + if (auto maybeUpdate = effect.Maybe<TKiUpdateTable>()) { + auto results = HandleUpdateTable(maybeUpdate.Cast(), ctx, tablesData, withSystemColumns, kqpCtx); + kqlEffects.insert(kqlEffects.end(), results.begin(), results.end()); + } + + if (auto maybeDelete = effect.Maybe<TKiDeleteTable>()) { + auto results = HandleDeleteTable(maybeDelete.Cast(), ctx, tablesData, withSystemColumns, kqpCtx); + kqlEffects.insert(kqlEffects.end(), results.begin(), results.end()); + } } - if (auto maybeDelete = effect.Maybe<TKiDeleteTable>()) { - auto results = HandleDeleteTable(maybeDelete.Cast(), ctx, tablesData, withSystemColumns, kqpCtx); - kqlEffects.insert(kqlEffects.end(), results.begin(), results.end()); + TVector <TKqlQueryResult> kqlResults; + kqlResults.reserve(block.Results().Size()); + for (const auto& kiResult : block.Results()) { + kqlResults.emplace_back( + Build<TKqlQueryResult>(ctx, kiResult.Pos()) + .Value(kiResult.Value()) + .ColumnHints(kiResult.Columns()) + .Done()); } - } - TVector<TKqlQueryResult> kqlResults; - kqlResults.reserve(query.Results().Size()); - for (const auto& kiResult : query.Results()) { - kqlResults.emplace_back( - Build<TKqlQueryResult>(ctx, kiResult.Pos()) - .Value(kiResult.Value()) - .ColumnHints(kiResult.Columns()) - .Done()); + queryBlocks.emplace_back(Build<TKqlQueryBlock>(ctx, query.Pos()) + .Results() + .Add(kqlResults) + .Build() + .Effects() + .Add(kqlEffects) + .Build() + .Done()); } TKqlQuery kqlQuery = Build<TKqlQuery>(ctx, query.Pos()) - .Results() - .Add(kqlResults) - .Build() - .Effects() - .Add(kqlEffects) + .Blocks() + .Add(queryBlocks) .Build() .Done(); diff --git a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp index c34a094ff3e..b2f4a0a42cb 100644 --- a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp +++ b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp @@ -20,10 +20,12 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() { auto query = TKqlQuery(input); YQL_ENSURE(query.Ref().GetTypeAnn()); - for (const auto& result : query.Effects()) { - if (!result.Maybe<TDqOutput>()) { - ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), "Failed to build query effects.")); - return TStatus::Error; + for (const auto& block : query.Blocks()) { + for (const auto& effect : block.Effects()) { + if (!effect.Maybe<TDqOutput>()) { + ctx.AddError(TIssue(ctx.GetPosition(effect.Pos()), "Failed to build query effects.")); + return TStatus::Error; + } } } diff --git a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp index 5c4e648c8e6..784d6ed5052 100644 --- a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp +++ b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp @@ -22,10 +22,6 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP TExprBase inputExpr(input); auto query = inputExpr.Cast<TKqlQuery>(); - if (query.Results().Empty()) { - return TStatus::Ok; - } - auto predicate = [](const NYql::TExprNode::TPtr& node) { return TMaybeNode<TDqPhyPrecompute>(node).IsValid(); }; @@ -39,41 +35,48 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP return precompute != nullptr; }; - // Currently we compute all query results in the last physical transaction, so we cannot - // precompute them early if no explicit DqPhyPrecompute is specified. - // Removing precompute in queries with multiple physical tx can lead to additional computations. - // More proper way to fix this is to allow partial result computation in early physical - // transactions. - bool omitResultPrecomputes = true; - for (const auto& result: query.Results()) { - if (result.Value().Maybe<TDqPhyPrecompute>()) { + TNodeOnNodeOwnedMap replaces; + for (const auto& block : query.Blocks()) { + + if (block.Results().Empty()) { continue; } - if (!hasPrecomputes(result.Value())) { - omitResultPrecomputes = false; - break; + // Currently we compute all query results in the last physical transaction, so we cannot + // precompute them early if no explicit DqPhyPrecompute is specified. + // Removing precompute in queries with multiple physical tx can lead to additional computations. + // More proper way to fix this is to allow partial result computation in early physical + // transactions. + bool omitResultPrecomputes = true; + for (const auto& result : block.Results()) { + if (result.Value().Maybe<TDqPhyPrecompute>()) { + continue; + } + + if (!hasPrecomputes(result.Value())) { + omitResultPrecomputes = false; + break; + } } - } - for (const auto& effect: query.Effects()) { - if (!hasPrecomputes(effect)) { - omitResultPrecomputes = false; - break; + for (const auto& effect: block.Effects()) { + if (!hasPrecomputes(effect)) { + omitResultPrecomputes = false; + break; + } } - } - TNodeOnNodeOwnedMap replaces; - for (const auto& queryResult: query.Results()) { - TExprBase node(queryResult.Value()); - - // TODO: Missing support for DqCnValue results in scan queries - if (node.Maybe<TDqPhyPrecompute>() && omitResultPrecomputes && !kqpCtx.IsScanQuery()) { - replaces[node.Raw()] = node.Cast<TDqPhyPrecompute>().Connection().Ptr(); - } else { - auto result = DqBuildPureExprStage(node, ctx); - if (result.Raw() != node.Raw()) { - YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of pure query #" << node.Raw()->UniqueId(); - replaces[node.Raw()] = result.Ptr(); + for (const auto& queryResult : block.Results()) { + TExprBase node(queryResult.Value()); + + // TODO: Missing support for DqCnValue results in scan queries + if (node.Maybe<TDqPhyPrecompute>() && omitResultPrecomputes && !kqpCtx.IsScanQuery()) { + replaces[node.Raw()] = node.Cast<TDqPhyPrecompute>().Connection().Ptr(); + } else { + auto result = DqBuildPureExprStage(node, ctx); + if (result.Raw() != node.Raw()) { + YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of pure query #" << node.Raw()->UniqueId(); + replaces[node.Raw()] = result.Ptr(); + } } } } @@ -87,13 +90,15 @@ TStatus KqpBuildUnionResult(const TExprNode::TPtr& input, TExprNode::TPtr& outpu auto query = inputExpr.Cast<TKqlQuery>(); TNodeOnNodeOwnedMap replaces; - for (const auto& queryResult: query.Results()) { - TExprBase node(queryResult.Value()); + for (const auto& block : query.Blocks()) { + for (const auto& queryResult: block.Results()) { + TExprBase node(queryResult.Value()); - auto result = DqBuildExtendStage(node, ctx); - if (result.Raw() != node.Raw()) { - YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of union #" << node.Raw()->UniqueId(); - replaces[node.Raw()] = result.Ptr(); + auto result = DqBuildExtendStage(node, ctx); + if (result.Raw() != node.Raw()) { + YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of union #" << node.Raw()->UniqueId(); + replaces[node.Raw()] = result.Ptr(); + } } } output = ctx.ReplaceNodes(TExprNode::TPtr(input), replaces); @@ -107,15 +112,9 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu TExprBase inputExpr(input); auto query = inputExpr.Cast<TKqlQuery>(); - const size_t resultsCount = query.Results().Size(); - - if (resultsCount <= 1) { - return TStatus::Ok; - } - struct TKqlQueryResultInfo { TMaybeNode<TKqlQueryResult> Node; - TVector<size_t> Indexes; + TVector<std::pair<size_t, size_t>> Indexes; TMaybeNode<TDqStage> ReplicateStage; size_t NextIndex = 0; }; @@ -123,20 +122,23 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu TNodeMap<TKqlQueryResultInfo> kqlQueryResults; bool hasDups = false; - for (size_t i = 0; i < resultsCount; ++i) { - TKqlQueryResult result = query.Results().Item(i); + for (size_t blockId = 0; blockId < query.Blocks().Size(); ++blockId) { + TKqlQueryBlock block = query.Blocks().Item(blockId); + for (size_t resultId = 0; resultId < block.Results().Size(); ++resultId) { + TKqlQueryResult result = block.Results().Item(resultId); - if (!result.Value().Maybe<TDqConnection>()) { - return TStatus::Ok; - } + if (!result.Value().Maybe<TDqConnection>()) { + return TStatus::Ok; + } - auto& info = kqlQueryResults[result.Raw()]; - if (info.Indexes.empty()) { - info.Node = result; - } else { - hasDups = true; + auto& info = kqlQueryResults[result.Raw()]; + if (info.Indexes.empty()) { + info.Node = result; + } else { + hasDups = true; + } + info.Indexes.push_back({blockId, resultId}); } - info.Indexes.push_back(i); } if (!hasDups) { @@ -168,32 +170,41 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu } } - TVector<TExprNode::TPtr> results(resultsCount); - - for (size_t i = 0; i < resultsCount; ++i) { - auto& info = kqlQueryResults.at(query.Results().Item(i).Raw()); - - if (info.Indexes.size() == 1) { - results[i] = info.Node.Cast().Ptr(); - } else { - results[i] = Build<TKqlQueryResult>(ctx, query.Pos()) - .Value<TDqCnUnionAll>() - .Output() - .Stage(info.ReplicateStage.Cast()) - .Index().Build(ToString(info.NextIndex)) + TVector<TKqlQueryBlock> queryBlocks; + queryBlocks.reserve(query.Blocks().Size()); + for (const auto& block : query.Blocks()) { + TVector<TExprNode::TPtr> results(block.Results().Size()); + for (size_t i = 0; i < block.Results().Size(); ++i) { + auto& info = kqlQueryResults.at(block.Results().Item(i).Raw()); + + if (info.Indexes.size() == 1) { + results[i] = info.Node.Cast().Ptr(); + } else { + results[i] = Build<TKqlQueryResult>(ctx, query.Pos()) + .Value<TDqCnUnionAll>() + .Output() + .Stage(info.ReplicateStage.Cast()) + .Index().Build(ToString(info.NextIndex)) + .Build() .Build() - .Build() - .ColumnHints(info.Node.Cast().ColumnHints()) - .Done().Ptr(); - info.NextIndex++; + .ColumnHints(info.Node.Cast().ColumnHints()) + .Done().Ptr(); + info.NextIndex++; + } } + + queryBlocks.emplace_back(Build<TKqlQueryBlock>(ctx, block.Pos()) + .Results() + .Add(results) + .Build() + .Effects(block.Effects()) + .Done()); } output = Build<TKqlQuery>(ctx, query.Pos()) - .Results() - .Add(results) + .Blocks() + .Add(queryBlocks) .Build() - .Effects(query.Effects()) .Done().Ptr(); return TStatus::Ok; diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp index 67bdcda3bf6..a8d17093569 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp @@ -234,6 +234,13 @@ private: return TStatus::Ok; } + TStatus HandleDataQueryBlock(TKiDataQueryBlock node, TExprContext& ctx) override { + Y_UNUSED(node); + Y_UNUSED(ctx); + + return TStatus::Ok; + } + TStatus HandleDataQuery(TKiDataQuery node, TExprContext& ctx) override { Y_UNUSED(ctx); for (const auto& op : node.Operations()) { @@ -713,6 +720,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt return HandleCommit(node.Cast(), ctx); } + if (auto node = callable.Maybe<TKiDataQueryBlock>()) { + return HandleDataQueryBlock(node.Cast(), ctx); + } + if (auto node = callable.Maybe<TKiDataQuery>()) { return HandleDataQuery(node.Cast(), ctx); } diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index c94eb2b2093..901e6f417dd 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -521,9 +521,16 @@ public: auto exec = execQuery.Cast(); auto query = exec.Query(); - auto results = query.Results(); - auto result = results.Item(index); + ui32 blockId = 0; + ui32 startBlockIndex = 0; + while (blockId < query.Blocks().Size() && startBlockIndex + query.Blocks().Item(blockId).Results().Size() <= index) { + startBlockIndex += query.Blocks().Item(blockId).Results().Size(); + ++blockId; + } + auto results = query.Blocks().Item(blockId).Results(); + + auto result = results.Item(index - startBlockIndex); ui64 rowsLimit = ::FromString<ui64>(result.RowsLimit()); if (!rowsLimit) { if (!fillSettings.RowsLimitPerWrite) { @@ -546,12 +553,13 @@ public: .RowsLimit().Build(ToString(rowsLimit)) .Done(); - auto newResults = ctx.ChangeChild(results.Ref(), index, newResult.Ptr()); + auto newResults = ctx.ChangeChild(results.Ref(), index - startBlockIndex, newResult.Ptr()); + auto newQueryBlock = ctx.ChangeChild(query.Blocks().Item(blockId).Ref(), 0, std::move(newResults)); + auto newQueryBlocks = ctx.ChangeChild(query.Blocks().Ref(), blockId, std::move(newQueryBlock)); auto newQuery = Build<TKiDataQuery>(ctx, query.Pos()) .Operations(query.Operations()) - .Results(newResults) - .Effects(query.Effects()) + .Blocks(newQueryBlocks) .Done(); auto newExec = Build<TKiExecDataQuery>(ctx, exec.Pos()) diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index aad25b01820..55527734af4 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1312,6 +1312,7 @@ private: NKikimrKqp::EIsolationLevel isolationLevel, TExprContext& ctx) { bool strictDml = SessionCtx->Config().StrictDml.Get(cluster).GetRef(); + bool enableImmediateEffects = SessionCtx->Config().FeatureFlags.GetEnableKqpImmediateEffects(); auto queryType = SessionCtx->Query().Type; TVector<NKqpProto::TKqpTableInfo> tableInfo; @@ -1325,10 +1326,12 @@ private: if (!SessionCtx->HasTx()) { TKikimrTransactionContextBase emptyCtx; - return emptyCtx.ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, queryType, ctx); + return emptyCtx.ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, enableImmediateEffects, + queryType, ctx); } - return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, queryType, ctx); + return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, + enableImmediateEffects, queryType, ctx); } bool ApplyDdlOperation(const TString& cluster, TPositionHandle pos, const TString& table, diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json index d96428ed0c6..fd9d0c06ab5 100644 --- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json +++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json @@ -245,13 +245,26 @@ "Match": {"Type": "Callable", "Name": "KiEffects"} }, { + "Name": "TKiDataQueryBlock", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "TKiDataQueryBlock"}, + "Children": [ + {"Index": 0, "Name": "Results", "Type": "TKiResultList"}, + {"Index": 1, "Name": "Effects", "Type": "TKiEffects"}, + {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + }, + { + "Name": "TKiDataQueryBlockList", + "ListBase": "TKiDataQueryBlock" + }, + { "Name": "TKiDataQuery", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "KiDataQuery"}, "Children": [ {"Index": 0, "Name": "Operations", "Type": "TKiOperationList"}, - {"Index": 1, "Name": "Results", "Type": "TKiResultList"}, - {"Index": 2, "Name": "Effects", "Type": "TKiEffects"} + {"Index": 1, "Name": "Blocks", "Type": "TKiDataQueryBlockList"} ] }, { diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 7526e060fa3..b312ff1eb7b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -64,10 +64,16 @@ ui64 GetResultRowsLimit(const TResWriteBase& resWrite) { } struct TKiExploreTxResults { + struct TKiQueryBlock { + TVector<TExprBase> Results; + TVector<TExprBase> Effects; + THashSet<std::string_view> TablesWithEffects; + bool HasUncommittedChangesRead = false; + }; + THashSet<const TExprNode*> Ops; TVector<TExprBase> Sync; - TVector<TExprBase> Results; - TVector<TExprBase> Effects; + TVector<TKiQueryBlock> QueryBlocks; TVector<TKiOperation> TableOperations; bool HasExecute; @@ -90,6 +96,44 @@ struct TKiExploreTxResults { } } + void AddEffect(const TExprBase& effect, std::string_view table) { + if (QueryBlocks.empty()) { + AddQueryBlock(); + } + + auto& curBlock = QueryBlocks.back(); + curBlock.Effects.push_back(effect); + curBlock.TablesWithEffects.insert(table); + } + + void AddResult(const TExprBase& result) { + if (QueryBlocks.empty()) { + AddQueryBlock(); + } + + auto& curBlock = QueryBlocks.back(); + curBlock.Results.push_back(result); + } + + bool HasEffects(std::string_view table) { + if (QueryBlocks.empty()) { + return false; + } + + auto& curBlock = QueryBlocks.back(); + return curBlock.TablesWithEffects.contains(table); + } + + void AddQueryBlock() { + QueryBlocks.emplace_back(); + } + + void SetBlockHasUncommittedChangesRead() { + YQL_ENSURE(!QueryBlocks.empty()); + auto& curBlock = QueryBlocks.back(); + curBlock.HasUncommittedChangesRead = true; + } + TKiExploreTxResults() : HasExecute(false) {} }; @@ -133,6 +177,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx)); + if (txRes.HasEffects(table)) { + txRes.AddQueryBlock(); + txRes.SetBlockHasUncommittedChangesRead(); + } return result; } @@ -147,7 +195,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto result = ExploreTx(write.World(), ctx, dataSink, txRes); auto tableOp = GetTableOp(write); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx)); - txRes.Effects.push_back(node); + txRes.AddEffect(node, table); return result; } @@ -161,7 +209,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T txRes.Ops.insert(node.Raw()); auto result = ExploreTx(update.World(), ctx, dataSink, txRes); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Update, update.Pos(), ctx)); - txRes.Effects.push_back(node); + txRes.AddEffect(node, table); return result; } @@ -175,7 +223,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T txRes.Ops.insert(node.Raw()); auto result = ExploreTx(del.World(), ctx, dataSink, txRes); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Delete, del.Pos(), ctx)); - txRes.Effects.push_back(node); + txRes.AddEffect(node, table); return result; } @@ -327,7 +375,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T { txRes.Ops.insert(node.Raw()); bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes); - txRes.Results.push_back(node); + txRes.AddResult(node); return result; } @@ -408,28 +456,42 @@ TExprNode::TPtr MakeSchemeTx(TCoCommit commit, TExprContext& ctx) { } TKiDataQuery MakeKiDataQuery(TExprBase node, const TKiExploreTxResults& txExplore, TExprContext& ctx) { - TExprNode::TListType queryResults; - for (auto& result : txExplore.Results) { - auto resWrite = result.Cast<TResWriteBase>(); + TVector<TKiDataQueryBlock> queryBlocks; + queryBlocks.reserve(txExplore.QueryBlocks.size()); - auto kiResult = Build<TKiResult>(ctx, node.Pos()) - .Value(resWrite.Data()) - .Columns(GetResultColumns(resWrite, ctx)) - .RowsLimit().Build(GetResultRowsLimit(resWrite)) - .Done(); + for (const auto& block : txExplore.QueryBlocks) { + TKiDataQueryBlockSettings settings; + settings.HasUncommittedChangesRead = block.HasUncommittedChangesRead; - queryResults.push_back(kiResult.Ptr()); + TExprNode::TListType queryResults; + for (auto& result : block.Results) { + auto resWrite = result.Cast<TResWriteBase>(); + + auto kiResult = Build<TKiResult>(ctx, node.Pos()) + .Value(resWrite.Data()) + .Columns(GetResultColumns(resWrite, ctx)) + .RowsLimit().Build(GetResultRowsLimit(resWrite)) + .Done(); + + queryResults.push_back(kiResult.Ptr()); + } + queryBlocks.emplace_back(Build<TKiDataQueryBlock>(ctx, node.Pos()) + .Results() + .Add(queryResults) + .Build() + .Effects() + .Add(block.Effects) + .Build() + .Settings(settings.BuildNode(ctx, node.Pos())) + .Done()); } auto query = Build<TKiDataQuery>(ctx, node.Pos()) .Operations() .Add(txExplore.TableOperations) .Build() - .Results() - .Add(queryResults) - .Build() - .Effects() - .Add(txExplore.Effects) + .Blocks() + .Add(queryBlocks) .Build() .Done(); @@ -527,36 +589,44 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx) { .Input(execQuery) .Done(); - if (!txExplore.Results.empty()) { + + bool hasResults = false; + for (const auto& block : txExplore.QueryBlocks) { + hasResults = hasResults || !block.Results.empty(); + } + + if (hasResults) { auto execRight = Build<TCoRight>(ctx, node.Pos()) .Input(execQuery) .Done() .Ptr(); - for (size_t i = 0; i < txExplore.Results.size(); ++i) { - auto result = txExplore.Results[i].Cast<TResWriteBase>(); - - auto extractValue = Build<TCoNth>(ctx, node.Pos()) - .Tuple(execRight) - .Index().Build(i) - .Done() - .Ptr(); - - auto newResult = ctx.ChangeChild( - *ctx.ChangeChild( - result.Ref(), - TResWriteBase::idx_World, - execWorld.Ptr() - ), - TResWriteBase::idx_Data, - std::move(extractValue) - ); - - execWorld = Build<TCoCommit>(ctx, node.Pos()) - .World(newResult) - .DataSink<TResultDataSink>() - .Build() - .Done(); + for (auto& block : txExplore.QueryBlocks) { + for (size_t i = 0; i < block.Results.size(); ++i) { + auto result = block.Results[i].Cast<TResWriteBase>(); + + auto extractValue = Build<TCoNth>(ctx, node.Pos()) + .Tuple(execRight) + .Index().Build(i) + .Done() + .Ptr(); + + auto newResult = ctx.ChangeChild( + *ctx.ChangeChild( + result.Ref(), + TResWriteBase::idx_World, + execWorld.Ptr() + ), + TResWriteBase::idx_Data, + std::move(extractValue) + ); + + execWorld = Build<TCoCommit>(ctx, node.Pos()) + .World(newResult) + .DataSink<TResultDataSink>() + .Build() + .Done(); + } } } @@ -586,9 +656,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont return node.Ptr(); } - auto dataQuery = Build<TKiDataQuery>(ctx, node.Pos()) - .Operations() - .Build() + auto queryBlock = Build<TKiDataQueryBlock>(ctx, node.Pos()) .Results() .Add() .Value(resFill.Data()) @@ -600,6 +668,14 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont .Build() .Done(); + auto dataQuery = Build<TKiDataQuery>(ctx, node.Pos()) + .Operations() + .Build() + .Blocks() + .Add(queryBlock) + .Build() + .Done(); + auto exec = Build<TKiExecDataQuery>(ctx, node.Pos()) .World(resFill.World()) .DataSink<TKiDataSink>() diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 0e4f11db3f1..33f62431415 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -50,6 +50,7 @@ struct TKikimrData { DataSinkNames.insert(TKiCreateGroup::CallableName()); DataSinkNames.insert(TKiAlterGroup::CallableName()); DataSinkNames.insert(TKiDropGroup::CallableName()); + DataSinkNames.insert(TKiDataQueryBlock::CallableName()); DataSinkNames.insert(TKiDataQuery::CallableName()); DataSinkNames.insert(TKiExecDataQuery::CallableName()); DataSinkNames.insert(TKiEffects::CallableName()); @@ -571,6 +572,33 @@ bool AddDmlIssue(const TIssue& issue, bool strictDml, TExprContext& ctx) { } } +TKiDataQueryBlockSettings TKiDataQueryBlockSettings::Parse(const NNodes::TKiDataQueryBlock& node) { + TKiDataQueryBlockSettings settings; + + for (const auto& tuple : node.Settings()) { + auto name = tuple.Name().Value(); + if (name == HasUncommittedChangesReadSettingName) { + settings.HasUncommittedChangesRead = true; + } + } + + return settings; +} + +NNodes::TCoNameValueTupleList TKiDataQueryBlockSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const { + TVector<TCoNameValueTuple> settings; + + if (HasUncommittedChangesRead) { + settings.push_back(Build<TCoNameValueTuple>(ctx, pos) + .Name().Build(HasUncommittedChangesReadSettingName) + .Done()); + } + + return Build<TCoNameValueTupleList>(ctx, pos) + .Add(settings) + .Done(); +} + TKiExecDataQuerySettings TKiExecDataQuerySettings::Parse(TKiExecDataQuery exec) { TKiExecDataQuerySettings settings; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 396bdc12520..149eea729b8 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -319,7 +319,7 @@ public: template<class IterableKqpTableOps, class IterableKqpTableInfos> bool ApplyTableOperations(const IterableKqpTableOps& operations, const IterableKqpTableInfos& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml, - EKikimrQueryType queryType, TExprContext& ctx) + bool enableImmediateEffects, EKikimrQueryType queryType, TExprContext& ctx) { if (IsClosed()) { TString message = TStringBuilder() << "Cannot perform operations on closed transaction."; @@ -396,8 +396,8 @@ public: } auto& currentOps = TableOperations[table]; - - if (currentOps & KikimrModifyOps()) { + bool currentModify = currentOps & KikimrModifyOps(); + if (currentModify && !enableImmediateEffects) { if (KikimrRequireUnmodifiedOps() & newOp) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed on previously modified table: " << table; @@ -430,7 +430,7 @@ public: // TODO: KIKIMR-3206 bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn); bool newUpdate = newOp == TYdbOperation::Update; - if (currentDelete && newUpdate) { + if (currentDelete && newUpdate && !enableImmediateEffects) { TString message = TStringBuilder() << "Operation '" << newOp << "' may lead to unexpected results when applied to table with deleted rows: " << table; if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), strictDml, ctx)) { diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index ca775e2e1cf..2a0646bc7b0 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -44,6 +44,7 @@ private: virtual TStatus HandleKql(NNodes::TCallable node, TExprContext& ctx) = 0; virtual TStatus HandleExecDataQuery(NNodes::TKiExecDataQuery node, TExprContext& ctx) = 0; virtual TStatus HandleDataQuery(NNodes::TKiDataQuery node, TExprContext& ctx) = 0; + virtual TStatus HandleDataQueryBlock(NNodes::TKiDataQueryBlock node, TExprContext& ctx) = 0; virtual TStatus HandleEffects(NNodes::TKiEffects node, TExprContext& ctx) = 0; }; @@ -96,6 +97,14 @@ private: TMaybe<TString> View; }; +struct TKiDataQueryBlockSettings { + static constexpr std::string_view HasUncommittedChangesReadSettingName = "has_uncommitted_changes_read"sv; + bool HasUncommittedChangesRead = false; + + static TKiDataQueryBlockSettings Parse(const NNodes::TKiDataQueryBlock& node); + NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const; +}; + struct TKiExecDataQuerySettings { TMaybe<TString> Mode; TVector<NNodes::TCoNameValueTuple> Other; diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index 2be0482632b..76a9f4af183 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -1220,7 +1220,7 @@ private: return TStatus::Ok; } - virtual TStatus HandleDataQuery(NNodes::TKiDataQuery node, TExprContext& ctx) override { + virtual TStatus HandleDataQueryBlock(NNodes::TKiDataQueryBlock node, TExprContext& ctx) override { if (!EnsureWorldType(node.Effects().Ref(), ctx)) { return TStatus::Error; } @@ -1252,6 +1252,23 @@ private: return TStatus::Ok; } + virtual TStatus HandleDataQuery(NNodes::TKiDataQuery node, TExprContext& ctx) override { + TTypeAnnotationNode::TListType resultTypes; + for (const auto& block : node.Blocks()) { + auto blockType = block.Ref().GetTypeAnn(); + if (!EnsureTupleType(block.Pos(), *blockType, ctx)) { + return TStatus::Error; + } + + for (const auto& resultType : blockType->Cast<TTupleExprType>()->GetItems()) { + resultTypes.push_back(resultType); + } + } + + node.Ptr()->SetTypeAnn(ctx.MakeType<TTupleExprType>(resultTypes)); + return TStatus::Ok; + } + virtual TStatus HandleExecDataQuery(NNodes::TKiExecDataQuery node, TExprContext& ctx) override { if (!EnsureWorldType(node.World().Ref(), ctx)) { return TStatus::Error; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 89535b80349..99cccc6589c 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -89,6 +89,10 @@ inline bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikim } } + if (config.FeatureFlags.GetEnableKqpImmediateEffects() && physicalQuery.GetHasUncommittedChangesRead()) { + return true; + } + // We don't want snapshot when there are effects at the moment, // because it hurts performance when there are multiple single-shard // reads and a single distributed commit. Taking snapshot costs @@ -768,9 +772,11 @@ public: std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) { auto isolationLevel = *txCtx->EffectiveIsolationLevel; bool strictDml = Config->StrictDml.Get(Settings.Cluster).GetOrElse(false); + bool enableImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects(); TExprContext ctx; - bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, strictDml, EKikimrQueryType::Dml, ctx); + bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, strictDml, + enableImmediateEffects, EKikimrQueryType::Dml, ctx); return {success, ctx.IssueManager.GetIssues()}; } @@ -1086,30 +1092,37 @@ public: std::shared_ptr<const NKqpProto::TKqpPhyTx> tx; if (QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) { - tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>( QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); + tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); } - while (tx && tx->GetHasEffects()) { - if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { - ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) - << "Failed to mix queries with old- and new- engines"; - } - LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); + if (!Config->FeatureFlags.GetEnableKqpImmediateEffects()) { + while (tx && tx->GetHasEffects()) { + YQL_ENSURE(txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))); + LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); - if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { - ++QueryState->CurrentTx; - tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, - &phyQuery.GetTransactions(QueryState->CurrentTx)); - } else { - tx = nullptr; - break; + if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { + ++QueryState->CurrentTx; + tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, + &phyQuery.GetTransactions(QueryState->CurrentTx)); + } else { + tx = nullptr; + break; + } } } + if (!CheckTransacionLocks() || !CheckTopicOperations()) { return; } - bool commit = QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1; + bool commit = false; + if (QueryState->Commit && Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) { + // every phy tx should acquire LockTxId, so commit is sent separately at the end + commit = QueryState->CurrentTx >= phyQuery.TransactionsSize(); + } else if (QueryState->Commit) { + commit = QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1; + } + if (tx || commit) { bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit); if (!replied) { @@ -1164,10 +1177,11 @@ public: } request.Transactions.emplace_back(tx, PrepareParameters(*tx)); + txCtx.HasImmediateEffects = txCtx.HasImmediateEffects || tx->GetHasEffects(); } else { YQL_ENSURE(commit); - if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) { + if (!txCtx.TxHasEffects() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) { ReplySuccess(); return true; } @@ -1191,7 +1205,7 @@ public: } if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasReadOperations()) { - request.ValidateLocks = !(txCtx.GetSnapshot().IsValid() && txCtx.DeferredEffects.Empty()) || + request.ValidateLocks = !txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasReadOperations(); request.EraseLocks = true; @@ -1262,7 +1276,7 @@ public: const auto& locks = result.GetLocks(); auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx); if (!success) { - if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) { + if (!txCtx->GetSnapshot().IsValid() || txCtx->TxHasEffects()) { ReplyQueryError(Ydb::StatusIds::ABORTED, "Error while locks merge", MessageFromIssues(issues)); return false; diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index 73340514207..c253e526ae2 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -89,7 +89,7 @@ bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& return false; } else { txCtx.Locks.MarkBroken(issues.back()); - if (!txCtx.DeferredEffects.Empty()) { + if (txCtx.TxHasEffects()) { txCtx.Locks.ReportIssues(ctx); return false; } diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index e4bf301cec0..913ade9e977 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -134,6 +134,10 @@ public: return DeferredEffects.Add(std::move(physicalTx), std::move(params)); } + bool TxHasEffects() const { + return HasImmediateEffects || !DeferredEffects.Empty(); + } + const IKqpGateway::TKqpSnapshot& GetSnapshot() const { return SnapshotHandle.Snapshot; } @@ -171,6 +175,7 @@ public: DeferredEffects.Clear(); ParamsState = MakeIntrusive<TParamsState>(); SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot; + HasImmediateEffects = false; } TKqpTransactionInfo GetInfo() const; @@ -225,6 +230,7 @@ public: TKqpTxLocks Locks; TDeferredEffects DeferredEffects; + bool HasImmediateEffects = false; NTopic::TTopicOperations TopicOperations; TIntrusivePtr<TParamsState> ParamsState; diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index dd327e67366..bf02c9e80d2 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -99,6 +99,16 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) { ServerSettings->SetFrFactory(&UdfFrFactory); ServerSettings->SetEnableNotNullColumns(true); ServerSettings->SetEnableMoveIndex(true); + + if (settings.FeatureFlags.GetEnableKqpImmediateEffects()) { + Tests::TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + ServerSettings->SetControls(controls); + } + if (settings.LogStream) ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream)); diff --git a/ydb/core/kqp/ut/kqp_effects_ut.cpp b/ydb/core/kqp/ut/kqp_effects_ut.cpp index 445136b7014..493699ed3c8 100644 --- a/ydb/core/kqp/ut/kqp_effects_ut.cpp +++ b/ydb/core/kqp/ut/kqp_effects_ut.cpp @@ -512,6 +512,227 @@ Y_UNIT_TEST_SUITE(KqpEffects) { UNIT_ASSERT_VALUES_EQUAL(reads[0]["type"], "Lookup"); UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3); } + + Y_UNIT_TEST(ImmediateInsert) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `/Root/TestImmediateInsert` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES + (1u, "One"), + (2u, "Two"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM `/Root/TestImmediateInsert`; + INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES + (3u, "Three"), + (4u, "Four"); + + SELECT * FROM `/Root/TestImmediateInsert`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["Three"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES (5u, "Five"); + INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES (6u, "Six"); + INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES (7u, "Seven"); + + SELECT * FROM `/Root/TestImmediateInsert`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["Three"]]; + [[4u];["Four"]]; + [[5u];["Five"]]; + [[6u];["Six"]]; + [[7u];["Seven"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ImmediateUpdate) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `/Root/TestImmediateUpdate` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO `/Root/TestImmediateUpdate` (Key, Value) VALUES + (1u, "One"), + (2u, "Two"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM `/Root/TestImmediateUpdate`; + UPDATE `/Root/TestImmediateUpdate` ON (Key, Value) VALUES + (1u, "Updated1"), + (2u, "Updated2"); + + SELECT * FROM `/Root/TestImmediateUpdate`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([ + [[1u];["Updated1"]]; + [[2u];["Updated2"]] + ])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPDATE `/Root/TestImmediateUpdate` ON (Key, Value) VALUES + (1u, "Updated3"), + (2u, "Updated4"); + + UPDATE `/Root/TestImmediateUpdate` ON (Key, Value) VALUES + (1u, "Updated5"); + + SELECT * FROM `/Root/TestImmediateUpdate`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["Updated5"]]; + [[2u];["Updated4"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(ImmediateDelete) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `/Root/TestImmediateDelete` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO `/Root/TestImmediateDelete` (Key, Value) VALUES + (1u, "One"), + (2u, "Two"), + (3u, "Three"), + (4u, "Four"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM `/Root/TestImmediateDelete`; + DELETE FROM `/Root/TestImmediateDelete` WHERE Key = 4; + + SELECT * FROM `/Root/TestImmediateDelete`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["Three"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["Three"]] + ])", FormatResultSetYson(result.GetResultSet(1))); + } + + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + DELETE FROM `/Root/TestImmediateDelete` WHERE Key > 2; + DELETE FROM `/Root/TestImmediateDelete` WHERE Key < 2; + + SELECT * FROM `/Root/TestImmediateDelete`; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[2u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 60e7fe2487d..ad2cf7f7643 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -747,6 +747,7 @@ message TFeatureFlags { optional bool EnableChangefeedInitialScan = 77 [default = false]; optional bool EnableKqpScanQuerySourceRead = 78 [default = false]; optional bool EnableDynamicNodeAuthorization = 79 [default = false]; + optional bool EnableKqpImmediateEffects = 80 [default = false]; } diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 54e6603fd28..c049b2cd9ab 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -337,4 +337,5 @@ message TKqpPhyQuery { // StrictDml constraints info repeated TKqpTableOp TableOps = 7; repeated TKqpTableInfo TableInfos = 8; + bool HasUncommittedChangesRead = 9; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 9e57bfa874a..85a086c269a 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -41,6 +41,7 @@ public: FEATURE_FLAG_SETTER(EnableArrowFormatAtDatashard) FEATURE_FLAG_SETTER(EnableGrpcAudit) FEATURE_FLAG_SETTER(EnableChangefeedInitialScan) + FEATURE_FLAG_SETTER(EnableKqpImmediateEffects) TDerived& SetEnableMvcc(std::optional<bool> value) { if (value) { |