aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2022-11-18 18:38:39 +0300
committerulya-sidorina <yulia@ydb.tech>2022-11-18 18:38:39 +0300
commitea58f4f34a9a7a66d041931c3f91003f41f6d83c (patch)
tree5dda86b2a35fbea7ae6c16aed8189f02f5d20757
parent20ba7932108cc6305f809d0011ea19b3fdb20d53 (diff)
downloadydb-ea58f4f34a9a7a66d041931c3f91003f41f6d83c.tar.gz
support uncommitted changes in KQP
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp12
-rw-r--r--ydb/core/kqp/compile/kqp_compile.h2
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp4
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json14
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp18
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp32
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp14
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp54
-rw-r--r--ydb/core/kqp/opt/kqp_opt_effects.cpp97
-rw-r--r--ydb/core/kqp/opt/kqp_opt_kql.cpp63
-rw-r--r--ydb/core/kqp/opt/kqp_opt_phy_check.cpp10
-rw-r--r--ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp165
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasink.cpp11
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_datasource.cpp18
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp7
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_expr_nodes.json17
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp172
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp28
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h8
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h9
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_type_ann.cpp19
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp52
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h6
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp10
-rw-r--r--ydb/core/kqp/ut/kqp_effects_ut.cpp221
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp_physical.proto1
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
29 files changed, 782 insertions, 286 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index e12ceca2c77..09f3ed5a430 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -355,7 +355,7 @@ public:
Alloc.Acquire();
}
- bool CompilePhysicalQuery(const TKqpPhysicalQuery& query, const TKiOperationList& tableOps,
+ bool CompilePhysicalQuery(const TKqpPhysicalQuery& query, const TKiDataQuery& dataQuery,
NKqpProto::TKqpPhyQuery& queryProto, TExprContext& ctx) final
{
TGuard<TScopedAlloc> allocGuard(Alloc);
@@ -364,7 +364,15 @@ public:
YQL_ENSURE(querySettings.Type);
queryProto.SetType(GetPhyQueryType(*querySettings.Type));
- auto ops = TableOperationsToProto(tableOps, ctx);
+ for (const auto& queryBlock : dataQuery.Blocks()) {
+ auto queryBlockSettings = TKiDataQueryBlockSettings::Parse(queryBlock);
+ if (queryBlockSettings.HasUncommittedChangesRead) {
+ queryProto.SetHasUncommittedChangesRead(true);
+ break;
+ }
+ }
+
+ auto ops = TableOperationsToProto(dataQuery.Operations(), ctx);
for (auto& op : ops) {
const auto tableName = op.GetTable();
auto operation = static_cast<TYdbOperation>(op.GetOperation());
diff --git a/ydb/core/kqp/compile/kqp_compile.h b/ydb/core/kqp/compile/kqp_compile.h
index 774bb3cc533..f9ed22a3b12 100644
--- a/ydb/core/kqp/compile/kqp_compile.h
+++ b/ydb/core/kqp/compile/kqp_compile.h
@@ -12,7 +12,7 @@ namespace NKqp {
class IKqpQueryCompiler : public TThrRefBase {
public:
virtual bool CompilePhysicalQuery(const NYql::NNodes::TKqpPhysicalQuery& query,
- const NYql::NNodes::TKiOperationList& tableOps, NKqpProto::TKqpPhyQuery& queryProto,
+ const NYql::NNodes::TKiDataQuery& dataQuery, NKqpProto::TKqpPhyQuery& queryProto,
NYql::TExprContext& ctx) = 0;
};
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index 40aefd37de6..bc8c4f62777 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1271,7 +1271,7 @@ private:
<< ", locks: " << dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString());
TEvDataShard::TEvProposeTransaction* ev;
- if (Snapshot.IsValid() && ReadOnlyTx) {
+ if (Snapshot.IsValid() && (ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects())) {
ev = new TEvDataShard::TEvProposeTransaction(
NKikimrTxDataShard::TX_KIND_DATA,
SelfId(),
@@ -1574,7 +1574,7 @@ private:
break;
}
- if (ReadOnlyTx && Request.Snapshot.IsValid()) {
+ if ((ReadOnlyTx || AppData()->FeatureFlags.GetEnableKqpImmediateEffects()) && Request.Snapshot.IsValid()) {
// Snapshot reads are always immediate
Snapshot = Request.Snapshot;
ImmediateTx = true;
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 8b323d48b71..7b084488c1b 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -321,7 +321,7 @@
"ListBase": "TKqlQueryResult"
},
{
- "Name": "TKqlQuery",
+ "Name": "TKqlQueryBlock",
"Base": "TExprBase",
"Match": {"Type": "Tuple"},
"Children": [
@@ -330,6 +330,18 @@
]
},
{
+ "Name": "TKqlQueryBlockList",
+ "ListBase": "TKqlQueryBlock"
+ },
+ {
+ "Name": "TKqlQuery",
+ "Base": "TExprBase",
+ "Match": {"Type": "Tuple"},
+ "Children": [
+ {"Index": 0, "Name": "Blocks", "Type": "TKqlQueryBlockList"}
+ ]
+ },
+ {
"Name": "TKqpEffects",
"VarArgBase": "TExprBase",
"Match": {"Type": "Callable", "Name": "KqpEffects"}
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 12262825433..6865597f335 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -722,8 +722,9 @@ public:
case EKikimrQueryType::YqlScript:
if (useScanQuery) {
ui64 rowsLimit = 0;
- if (!dataQuery.Results().Empty()) {
- rowsLimit = FromString<ui64>(dataQuery.Results().Item(0).RowsLimit());
+ if (!dataQuery.Blocks().Empty() && !dataQuery.Blocks().Item(0).Results().Empty()) {
+ const auto& queryBlock = dataQuery.Blocks().Item(0);
+ rowsLimit = FromString<ui64>(queryBlock.Results().Item(0).RowsLimit());
}
if (SessionCtx->Query().PrepareOnly) {
@@ -800,12 +801,19 @@ private:
return *settings.UseScanQuery;
}
- if (query.Effects().ArgCount() > 0) {
+ if (query.Blocks().Size() != 1) {
+ // Don't use ScanQuery for muiltiple blocks query
+ return false;
+ }
+
+ const auto& queryBlock = query.Blocks().Item(0);
+
+ if (queryBlock.Effects().ArgCount() > 0) {
// Do not use ScanQuery for queries with effects.
return false;
}
- if (query.Results().Size() != 1) {
+ if (queryBlock.Results().Size() != 1) {
// Do not use ScanQuery for queries with multiple result sets.
return false;
}
@@ -829,7 +837,7 @@ private:
bool hasIndexReads = false;
bool hasJoins = false;
- VisitExpr(query.Results().Ptr(), [&hasIndexReads, &hasJoins] (const TExprNode::TPtr& exprNode) {
+ VisitExpr(queryBlock.Results().Ptr(), [&hasIndexReads, &hasJoins] (const TExprNode::TPtr& exprNode) {
auto node = TExprBase(exprNode);
if (auto read = node.Maybe<TKiReadTable>()) {
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index e71a4fb8408..6dc2b053a0e 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -142,12 +142,19 @@ public:
TKiDataQuery dataQuery(query);
- if (dataQuery.Results().Size() != 1) {
+ if (dataQuery.Blocks().Size() != 1) {
+ ctx.AddError(YqlIssue(ctx.GetPosition(dataQuery.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
+ "Scan query should have single query block."));
+ return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
+ }
+
+ const auto& queryBlock = dataQuery.Blocks().Item(0);
+ if (queryBlock.Results().Size() != 1) {
ctx.AddError(YqlIssue(ctx.GetPosition(dataQuery.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
"Scan query should have a single result set."));
return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
}
- if (dataQuery.Effects().ArgCount() > 0) {
+ if (queryBlock.Effects().ArgCount() > 0) {
ctx.AddError(YqlIssue(ctx.GetPosition(dataQuery.Pos()), TIssuesIds::KIKIMR_PRECONDITION_FAILED,
"Scan query cannot have data modifications."));
return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
@@ -166,15 +173,17 @@ private:
auto* queryCtx = TransformCtx->QueryCtx.Get();
if (queryCtx->Type == EKikimrQueryType::Dml) {
- ui32 resultsCount = dataQuery.Results().Size();
- for (ui32 i = 0; i < resultsCount; ++i) {
- auto& result = *queryCtx->PreparingQuery->AddResults();
- result.SetKqlIndex(0);
- result.SetResultIndex(i);
- for (const auto& column : dataQuery.Results().Item(i).Columns()) {
- *result.AddColumnHints() = column.Value();
+ ui32 resultsCount = 0;
+ for (const auto& block : dataQuery.Blocks()) {
+ for (ui32 i = 0; i < block.Results().Size(); ++i, ++resultsCount) {
+ auto& result = *queryCtx->PreparingQuery->AddResults();
+ result.SetKqlIndex(0);
+ result.SetResultIndex(resultsCount);
+ for (const auto& column : block.Results().Item(i).Columns()) {
+ *result.AddColumnHints() = column.Value();
+ }
+ result.SetRowsLimit(FromString<ui64>(block.Results().Item(i).RowsLimit()));
}
- result.SetRowsLimit(FromString<ui64>(dataQuery.Results().Item(i).RowsLimit()));
}
} else {
// scan query
@@ -251,8 +260,7 @@ private:
auto& preparedQuery = *TransformCtx->QueryCtx->PreparingQuery;
TKqpPhysicalQuery physicalQuery(transformedQuery);
auto compiler = CreateKqpQueryCompiler(Cluster, OptimizeCtx->Tables, FuncRegistry);
- auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQuery.Operations(),
- *preparedQuery.MutablePhysicalQuery(), ctx);
+ auto ret = compiler->CompilePhysicalQuery(physicalQuery, dataQuery, *preparedQuery.MutablePhysicalQuery(), ctx);
if (!ret) {
ctx.AddError(TIssue(ctx.GetPosition(query->Pos()), "Failed to compile physical query."));
return MakeKikimrResultHolder(ResultFromErrors<IKqpHost::TQueryResult>(ctx.IssueManager.GetIssues()));
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index e32e895883f..6033b54279a 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -1363,12 +1363,14 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckQueryTransformer() {
YQL_ENSURE(TMaybeNode<TKqlQuery>(input));
auto query = TKqlQuery(input);
- for (const auto& result : query.Results()) {
- if (!EnsureTupleSize(result.Ref(), 2, ctx)) {
- return TStatus::Error;
- }
- if (!EnsureListType(result.Value().Ref(), ctx)) {
- return TStatus::Error;
+ for (const auto& block : query.Blocks()) {
+ for (const auto& result : block.Results()) {
+ if (!EnsureTupleSize(result.Ref(), 2, ctx)) {
+ return TStatus::Error;
+ }
+ if (!EnsureListType(result.Value().Ref(), ctx)) {
+ return TStatus::Error;
+ }
}
}
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index b40b1e7b5be..67681c51db6 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -473,39 +473,41 @@ public:
}
TVector<TExprBase> queryResults;
- if (!query.Results().Empty()) {
- auto tx = BuildTx(query.Results().Ptr(), ctx, false);
- if (!tx) {
- return TStatus::Error;
- }
+ for (const auto& block : query.Blocks()) {
+ if (!block.Results().Empty()) {
+ auto tx = BuildTx(block.Results().Ptr(), ctx, false);
+ if (!tx) {
+ return TStatus::Error;
+ }
- BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
+ BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
- for (ui32 i = 0; i < query.Results().Size(); ++i) {
- const auto& result = query.Results().Item(i);
- auto binding = Build<TKqpTxResultBinding>(ctx, query.Pos())
- .Type(ExpandType(query.Pos(), *result.Value().Ref().GetTypeAnn(), ctx))
- .TxIndex()
- .Build(ToString(BuildCtx->PhysicalTxs.size() - 1))
- .ResultIndex()
- .Build(ToString(i))
- .Done();
+ for (ui32 i = 0; i < block.Results().Size(); ++i) {
+ const auto& result = block.Results().Item(i);
+ auto binding = Build<TKqpTxResultBinding>(ctx, block.Pos())
+ .Type(ExpandType(block.Pos(), *result.Value().Ref().GetTypeAnn(), ctx))
+ .TxIndex()
+ .Build(ToString(BuildCtx->PhysicalTxs.size() - 1))
+ .ResultIndex()
+ .Build(ToString(i))
+ .Done();
- queryResults.emplace_back(std::move(binding));
+ queryResults.emplace_back(std::move(binding));
+ }
}
- }
- if (!query.Effects().Empty()) {
- auto tx = BuildTx(query.Effects().Ptr(), ctx, /* isPrecompute */ false);
- if (!tx) {
- return TStatus::Error;
- }
+ if (!block.Effects().Empty()) {
+ auto tx = BuildTx(block.Effects().Ptr(), ctx, /* isPrecompute */ false);
+ if (!tx) {
+ return TStatus::Error;
+ }
- if (!CheckEffectsTx(tx.Cast(), ctx)) {
- return TStatus::Error;
- }
+ if (!CheckEffectsTx(tx.Cast(), ctx)) {
+ return TStatus::Error;
+ }
- BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
+ BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
+ }
}
TKqpPhyQuerySettings querySettings;
diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp
index 48fe4039bf6..27ba20e8ea4 100644
--- a/ydb/core/kqp/opt/kqp_opt_effects.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp
@@ -415,59 +415,70 @@ template <bool GroupEffectsByTable>
TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx)
{
- TVector<TExprBase> builtEffects;
+ TVector<TKqlQueryBlock> queryBlocks;
+ queryBlocks.reserve(query.Blocks().Size());
- if constexpr (GroupEffectsByTable) {
- TMap<TStringBuf, TVector<TKqlTableEffect>> tableEffectsMap;
- for (const auto& maybeEffect: query.Effects()) {
- if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
- for (const auto effect : maybeList.Cast()) {
- YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
- auto tableEffect = effect.Cast<TKqlTableEffect>();
+ for (const auto& block : query.Blocks()) {
+ TVector<TExprBase> builtEffects;
+
+ if constexpr(GroupEffectsByTable) {
+ TMap<TStringBuf, TVector<TKqlTableEffect>> tableEffectsMap;
+ for (const auto& maybeEffect : block.Effects()) {
+ if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
+ for (const auto effect : maybeList.Cast()) {
+ YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
+ auto tableEffect = effect.Cast<TKqlTableEffect>();
+
+ tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect);
+ }
+ } else {
+ YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
+ auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect);
}
- } else {
- YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
- auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
-
- tableEffectsMap[tableEffect.Table().Path()].push_back(tableEffect);
}
- }
- for (const auto& pair: tableEffectsMap) {
- if (!BuildEffects(query.Pos(), pair.second, ctx, kqpCtx, builtEffects)) {
- return {};
+ for (const auto& pair: tableEffectsMap) {
+ if (!BuildEffects(block.Pos(), pair.second, ctx, kqpCtx, builtEffects)) {
+ return {};
+ }
}
- }
- } else {
- builtEffects.reserve(query.Effects().Size() * 2);
-
- for (const auto& maybeEffect : query.Effects()) {
- if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
- for (const auto effect : maybeList.Cast()) {
- YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
- auto tableEffect = effect.Cast<TKqlTableEffect>();
+ } else {
+ builtEffects.reserve(block.Effects().Size() * 2);
+
+ for (const auto &maybeEffect : block.Effects()) {
+ if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
+ for (const auto effect : maybeList.Cast()) {
+ YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
+ auto tableEffect = effect.Cast<TKqlTableEffect>();
+
+ if (!BuildEffects(block.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) {
+ return {};
+ }
+ }
+ } else {
+ YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
+ auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
- if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) {
+ if (!BuildEffects(block.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) {
return {};
}
}
- } else {
- YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
- auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
-
- if (!BuildEffects(query.Pos(), {tableEffect}, ctx, kqpCtx, builtEffects)) {
- return {};
- }
}
}
+
+ queryBlocks.emplace_back(Build<TKqlQueryBlock>(ctx, block.Pos())
+ .Results(block.Results())
+ .Effects()
+ .Add(builtEffects)
+ .Build()
+ .Done());
}
return Build<TKqlQuery>(ctx, query.Pos())
- .Results(query.Results())
- .Effects()
- .Add(builtEffects)
+ .Blocks()
+ .Add(queryBlocks)
.Build()
.Done();
}
@@ -487,11 +498,13 @@ TAutoPtr<IGraphTransformer> CreateKqpQueryEffectsTransformer(const TIntrusivePtr
bool requireBuild = false;
bool hasBuilt = false;
- for (const auto& effect : query.Effects()) {
- if (!effect.Maybe<TDqOutput>()) {
- requireBuild = true;
- } else {
- hasBuilt = true;
+ for (const auto& block : query.Blocks()) {
+ for (const auto& effect : block.Effects()) {
+ if (!effect.Maybe<TDqOutput>()) {
+ requireBuild = true;
+ } else {
+ hasBuilt = true;
+ }
}
}
diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp
index 93c7bea5af6..0fed2b1100b 100644
--- a/ydb/core/kqp/opt/kqp_opt_kql.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp
@@ -748,40 +748,51 @@ TIntrusivePtr<TKikimrTableMetadata> GetIndexMetadata(const TKqlReadTableIndex& r
TMaybe<TKqlQuery> BuildKqlQuery(TKiDataQuery query, const TKikimrTablesData& tablesData, TExprContext& ctx,
bool withSystemColumns, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx)
{
- TVector<TExprBase> kqlEffects;
- for (const auto& effect : query.Effects()) {
- if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) {
- auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData);
- kqlEffects.push_back(result);
- }
+ TVector<TKqlQueryBlock> queryBlocks;
+ queryBlocks.reserve(query.Blocks().Size());
+
+ for (const auto& block : query.Blocks()) {
+ TVector <TExprBase> kqlEffects;
+ for (const auto& effect : block.Effects()) {
+ if (auto maybeWrite = effect.Maybe<TKiWriteTable>()) {
+ auto result = HandleWriteTable(maybeWrite.Cast(), ctx, tablesData);
+ kqlEffects.push_back(result);
+ }
- if (auto maybeUpdate = effect.Maybe<TKiUpdateTable>()) {
- auto results = HandleUpdateTable(maybeUpdate.Cast(), ctx, tablesData, withSystemColumns, kqpCtx);
- kqlEffects.insert(kqlEffects.end(), results.begin(), results.end());
+ if (auto maybeUpdate = effect.Maybe<TKiUpdateTable>()) {
+ auto results = HandleUpdateTable(maybeUpdate.Cast(), ctx, tablesData, withSystemColumns, kqpCtx);
+ kqlEffects.insert(kqlEffects.end(), results.begin(), results.end());
+ }
+
+ if (auto maybeDelete = effect.Maybe<TKiDeleteTable>()) {
+ auto results = HandleDeleteTable(maybeDelete.Cast(), ctx, tablesData, withSystemColumns, kqpCtx);
+ kqlEffects.insert(kqlEffects.end(), results.begin(), results.end());
+ }
}
- if (auto maybeDelete = effect.Maybe<TKiDeleteTable>()) {
- auto results = HandleDeleteTable(maybeDelete.Cast(), ctx, tablesData, withSystemColumns, kqpCtx);
- kqlEffects.insert(kqlEffects.end(), results.begin(), results.end());
+ TVector <TKqlQueryResult> kqlResults;
+ kqlResults.reserve(block.Results().Size());
+ for (const auto& kiResult : block.Results()) {
+ kqlResults.emplace_back(
+ Build<TKqlQueryResult>(ctx, kiResult.Pos())
+ .Value(kiResult.Value())
+ .ColumnHints(kiResult.Columns())
+ .Done());
}
- }
- TVector<TKqlQueryResult> kqlResults;
- kqlResults.reserve(query.Results().Size());
- for (const auto& kiResult : query.Results()) {
- kqlResults.emplace_back(
- Build<TKqlQueryResult>(ctx, kiResult.Pos())
- .Value(kiResult.Value())
- .ColumnHints(kiResult.Columns())
- .Done());
+ queryBlocks.emplace_back(Build<TKqlQueryBlock>(ctx, query.Pos())
+ .Results()
+ .Add(kqlResults)
+ .Build()
+ .Effects()
+ .Add(kqlEffects)
+ .Build()
+ .Done());
}
TKqlQuery kqlQuery = Build<TKqlQuery>(ctx, query.Pos())
- .Results()
- .Add(kqlResults)
- .Build()
- .Effects()
- .Add(kqlEffects)
+ .Blocks()
+ .Add(queryBlocks)
.Build()
.Done();
diff --git a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
index c34a094ff3e..b2f4a0a42cb 100644
--- a/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_phy_check.cpp
@@ -20,10 +20,12 @@ TAutoPtr<IGraphTransformer> CreateKqpCheckPhysicalQueryTransformer() {
auto query = TKqlQuery(input);
YQL_ENSURE(query.Ref().GetTypeAnn());
- for (const auto& result : query.Effects()) {
- if (!result.Maybe<TDqOutput>()) {
- ctx.AddError(TIssue(ctx.GetPosition(result.Pos()), "Failed to build query effects."));
- return TStatus::Error;
+ for (const auto& block : query.Blocks()) {
+ for (const auto& effect : block.Effects()) {
+ if (!effect.Maybe<TDqOutput>()) {
+ ctx.AddError(TIssue(ctx.GetPosition(effect.Pos()), "Failed to build query effects."));
+ return TStatus::Error;
+ }
}
}
diff --git a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
index 5c4e648c8e6..784d6ed5052 100644
--- a/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_phy_finalize.cpp
@@ -22,10 +22,6 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP
TExprBase inputExpr(input);
auto query = inputExpr.Cast<TKqlQuery>();
- if (query.Results().Empty()) {
- return TStatus::Ok;
- }
-
auto predicate = [](const NYql::TExprNode::TPtr& node) {
return TMaybeNode<TDqPhyPrecompute>(node).IsValid();
};
@@ -39,41 +35,48 @@ TStatus KqpBuildPureExprStagesResult(const TExprNode::TPtr& input, TExprNode::TP
return precompute != nullptr;
};
- // Currently we compute all query results in the last physical transaction, so we cannot
- // precompute them early if no explicit DqPhyPrecompute is specified.
- // Removing precompute in queries with multiple physical tx can lead to additional computations.
- // More proper way to fix this is to allow partial result computation in early physical
- // transactions.
- bool omitResultPrecomputes = true;
- for (const auto& result: query.Results()) {
- if (result.Value().Maybe<TDqPhyPrecompute>()) {
+ TNodeOnNodeOwnedMap replaces;
+ for (const auto& block : query.Blocks()) {
+
+ if (block.Results().Empty()) {
continue;
}
- if (!hasPrecomputes(result.Value())) {
- omitResultPrecomputes = false;
- break;
+ // Currently we compute all query results in the last physical transaction, so we cannot
+ // precompute them early if no explicit DqPhyPrecompute is specified.
+ // Removing precompute in queries with multiple physical tx can lead to additional computations.
+ // More proper way to fix this is to allow partial result computation in early physical
+ // transactions.
+ bool omitResultPrecomputes = true;
+ for (const auto& result : block.Results()) {
+ if (result.Value().Maybe<TDqPhyPrecompute>()) {
+ continue;
+ }
+
+ if (!hasPrecomputes(result.Value())) {
+ omitResultPrecomputes = false;
+ break;
+ }
}
- }
- for (const auto& effect: query.Effects()) {
- if (!hasPrecomputes(effect)) {
- omitResultPrecomputes = false;
- break;
+ for (const auto& effect: block.Effects()) {
+ if (!hasPrecomputes(effect)) {
+ omitResultPrecomputes = false;
+ break;
+ }
}
- }
- TNodeOnNodeOwnedMap replaces;
- for (const auto& queryResult: query.Results()) {
- TExprBase node(queryResult.Value());
-
- // TODO: Missing support for DqCnValue results in scan queries
- if (node.Maybe<TDqPhyPrecompute>() && omitResultPrecomputes && !kqpCtx.IsScanQuery()) {
- replaces[node.Raw()] = node.Cast<TDqPhyPrecompute>().Connection().Ptr();
- } else {
- auto result = DqBuildPureExprStage(node, ctx);
- if (result.Raw() != node.Raw()) {
- YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of pure query #" << node.Raw()->UniqueId();
- replaces[node.Raw()] = result.Ptr();
+ for (const auto& queryResult : block.Results()) {
+ TExprBase node(queryResult.Value());
+
+ // TODO: Missing support for DqCnValue results in scan queries
+ if (node.Maybe<TDqPhyPrecompute>() && omitResultPrecomputes && !kqpCtx.IsScanQuery()) {
+ replaces[node.Raw()] = node.Cast<TDqPhyPrecompute>().Connection().Ptr();
+ } else {
+ auto result = DqBuildPureExprStage(node, ctx);
+ if (result.Raw() != node.Raw()) {
+ YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of pure query #" << node.Raw()->UniqueId();
+ replaces[node.Raw()] = result.Ptr();
+ }
}
}
}
@@ -87,13 +90,15 @@ TStatus KqpBuildUnionResult(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
auto query = inputExpr.Cast<TKqlQuery>();
TNodeOnNodeOwnedMap replaces;
- for (const auto& queryResult: query.Results()) {
- TExprBase node(queryResult.Value());
+ for (const auto& block : query.Blocks()) {
+ for (const auto& queryResult: block.Results()) {
+ TExprBase node(queryResult.Value());
- auto result = DqBuildExtendStage(node, ctx);
- if (result.Raw() != node.Raw()) {
- YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of union #" << node.Raw()->UniqueId();
- replaces[node.Raw()] = result.Ptr();
+ auto result = DqBuildExtendStage(node, ctx);
+ if (result.Raw() != node.Raw()) {
+ YQL_CLOG(DEBUG, ProviderKqp) << "Building stage out of union #" << node.Raw()->UniqueId();
+ replaces[node.Raw()] = result.Ptr();
+ }
}
}
output = ctx.ReplaceNodes(TExprNode::TPtr(input), replaces);
@@ -107,15 +112,9 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
TExprBase inputExpr(input);
auto query = inputExpr.Cast<TKqlQuery>();
- const size_t resultsCount = query.Results().Size();
-
- if (resultsCount <= 1) {
- return TStatus::Ok;
- }
-
struct TKqlQueryResultInfo {
TMaybeNode<TKqlQueryResult> Node;
- TVector<size_t> Indexes;
+ TVector<std::pair<size_t, size_t>> Indexes;
TMaybeNode<TDqStage> ReplicateStage;
size_t NextIndex = 0;
};
@@ -123,20 +122,23 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
TNodeMap<TKqlQueryResultInfo> kqlQueryResults;
bool hasDups = false;
- for (size_t i = 0; i < resultsCount; ++i) {
- TKqlQueryResult result = query.Results().Item(i);
+ for (size_t blockId = 0; blockId < query.Blocks().Size(); ++blockId) {
+ TKqlQueryBlock block = query.Blocks().Item(blockId);
+ for (size_t resultId = 0; resultId < block.Results().Size(); ++resultId) {
+ TKqlQueryResult result = block.Results().Item(resultId);
- if (!result.Value().Maybe<TDqConnection>()) {
- return TStatus::Ok;
- }
+ if (!result.Value().Maybe<TDqConnection>()) {
+ return TStatus::Ok;
+ }
- auto& info = kqlQueryResults[result.Raw()];
- if (info.Indexes.empty()) {
- info.Node = result;
- } else {
- hasDups = true;
+ auto& info = kqlQueryResults[result.Raw()];
+ if (info.Indexes.empty()) {
+ info.Node = result;
+ } else {
+ hasDups = true;
+ }
+ info.Indexes.push_back({blockId, resultId});
}
- info.Indexes.push_back(i);
}
if (!hasDups) {
@@ -168,32 +170,41 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
}
}
- TVector<TExprNode::TPtr> results(resultsCount);
-
- for (size_t i = 0; i < resultsCount; ++i) {
- auto& info = kqlQueryResults.at(query.Results().Item(i).Raw());
-
- if (info.Indexes.size() == 1) {
- results[i] = info.Node.Cast().Ptr();
- } else {
- results[i] = Build<TKqlQueryResult>(ctx, query.Pos())
- .Value<TDqCnUnionAll>()
- .Output()
- .Stage(info.ReplicateStage.Cast())
- .Index().Build(ToString(info.NextIndex))
+ TVector<TKqlQueryBlock> queryBlocks;
+ queryBlocks.reserve(query.Blocks().Size());
+ for (const auto& block : query.Blocks()) {
+ TVector<TExprNode::TPtr> results(block.Results().Size());
+ for (size_t i = 0; i < block.Results().Size(); ++i) {
+ auto& info = kqlQueryResults.at(block.Results().Item(i).Raw());
+
+ if (info.Indexes.size() == 1) {
+ results[i] = info.Node.Cast().Ptr();
+ } else {
+ results[i] = Build<TKqlQueryResult>(ctx, query.Pos())
+ .Value<TDqCnUnionAll>()
+ .Output()
+ .Stage(info.ReplicateStage.Cast())
+ .Index().Build(ToString(info.NextIndex))
+ .Build()
.Build()
- .Build()
- .ColumnHints(info.Node.Cast().ColumnHints())
- .Done().Ptr();
- info.NextIndex++;
+ .ColumnHints(info.Node.Cast().ColumnHints())
+ .Done().Ptr();
+ info.NextIndex++;
+ }
}
+
+ queryBlocks.emplace_back(Build<TKqlQueryBlock>(ctx, block.Pos())
+ .Results()
+ .Add(results)
+ .Build()
+ .Effects(block.Effects())
+ .Done());
}
output = Build<TKqlQuery>(ctx, query.Pos())
- .Results()
- .Add(results)
+ .Blocks()
+ .Add(queryBlocks)
.Build()
- .Effects(query.Effects())
.Done().Ptr();
return TStatus::Ok;
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
index 67bdcda3bf6..a8d17093569 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasink.cpp
@@ -234,6 +234,13 @@ private:
return TStatus::Ok;
}
+ TStatus HandleDataQueryBlock(TKiDataQueryBlock node, TExprContext& ctx) override {
+ Y_UNUSED(node);
+ Y_UNUSED(ctx);
+
+ return TStatus::Ok;
+ }
+
TStatus HandleDataQuery(TKiDataQuery node, TExprContext& ctx) override {
Y_UNUSED(ctx);
for (const auto& op : node.Operations()) {
@@ -713,6 +720,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
return HandleCommit(node.Cast(), ctx);
}
+ if (auto node = callable.Maybe<TKiDataQueryBlock>()) {
+ return HandleDataQueryBlock(node.Cast(), ctx);
+ }
+
if (auto node = callable.Maybe<TKiDataQuery>()) {
return HandleDataQuery(node.Cast(), ctx);
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
index c94eb2b2093..901e6f417dd 100644
--- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp
@@ -521,9 +521,16 @@ public:
auto exec = execQuery.Cast();
auto query = exec.Query();
- auto results = query.Results();
- auto result = results.Item(index);
+ ui32 blockId = 0;
+ ui32 startBlockIndex = 0;
+ while (blockId < query.Blocks().Size() && startBlockIndex + query.Blocks().Item(blockId).Results().Size() <= index) {
+ startBlockIndex += query.Blocks().Item(blockId).Results().Size();
+ ++blockId;
+ }
+ auto results = query.Blocks().Item(blockId).Results();
+
+ auto result = results.Item(index - startBlockIndex);
ui64 rowsLimit = ::FromString<ui64>(result.RowsLimit());
if (!rowsLimit) {
if (!fillSettings.RowsLimitPerWrite) {
@@ -546,12 +553,13 @@ public:
.RowsLimit().Build(ToString(rowsLimit))
.Done();
- auto newResults = ctx.ChangeChild(results.Ref(), index, newResult.Ptr());
+ auto newResults = ctx.ChangeChild(results.Ref(), index - startBlockIndex, newResult.Ptr());
+ auto newQueryBlock = ctx.ChangeChild(query.Blocks().Item(blockId).Ref(), 0, std::move(newResults));
+ auto newQueryBlocks = ctx.ChangeChild(query.Blocks().Ref(), blockId, std::move(newQueryBlock));
auto newQuery = Build<TKiDataQuery>(ctx, query.Pos())
.Operations(query.Operations())
- .Results(newResults)
- .Effects(query.Effects())
+ .Blocks(newQueryBlocks)
.Done();
auto newExec = Build<TKiExecDataQuery>(ctx, exec.Pos())
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index aad25b01820..55527734af4 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1312,6 +1312,7 @@ private:
NKikimrKqp::EIsolationLevel isolationLevel, TExprContext& ctx)
{
bool strictDml = SessionCtx->Config().StrictDml.Get(cluster).GetRef();
+ bool enableImmediateEffects = SessionCtx->Config().FeatureFlags.GetEnableKqpImmediateEffects();
auto queryType = SessionCtx->Query().Type;
TVector<NKqpProto::TKqpTableInfo> tableInfo;
@@ -1325,10 +1326,12 @@ private:
if (!SessionCtx->HasTx()) {
TKikimrTransactionContextBase emptyCtx;
- return emptyCtx.ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, queryType, ctx);
+ return emptyCtx.ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, enableImmediateEffects,
+ queryType, ctx);
}
- return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml, queryType, ctx);
+ return SessionCtx->Tx().ApplyTableOperations(tableOps, tableInfo, isolationLevel, strictDml,
+ enableImmediateEffects, queryType, ctx);
}
bool ApplyDdlOperation(const TString& cluster, TPositionHandle pos, const TString& table,
diff --git a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
index d96428ed0c6..fd9d0c06ab5 100644
--- a/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
+++ b/ydb/core/kqp/provider/yql_kikimr_expr_nodes.json
@@ -245,13 +245,26 @@
"Match": {"Type": "Callable", "Name": "KiEffects"}
},
{
+ "Name": "TKiDataQueryBlock",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "TKiDataQueryBlock"},
+ "Children": [
+ {"Index": 0, "Name": "Results", "Type": "TKiResultList"},
+ {"Index": 1, "Name": "Effects", "Type": "TKiEffects"},
+ {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList"}
+ ]
+ },
+ {
+ "Name": "TKiDataQueryBlockList",
+ "ListBase": "TKiDataQueryBlock"
+ },
+ {
"Name": "TKiDataQuery",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KiDataQuery"},
"Children": [
{"Index": 0, "Name": "Operations", "Type": "TKiOperationList"},
- {"Index": 1, "Name": "Results", "Type": "TKiResultList"},
- {"Index": 2, "Name": "Effects", "Type": "TKiEffects"}
+ {"Index": 1, "Name": "Blocks", "Type": "TKiDataQueryBlockList"}
]
},
{
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 7526e060fa3..b312ff1eb7b 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -64,10 +64,16 @@ ui64 GetResultRowsLimit(const TResWriteBase& resWrite) {
}
struct TKiExploreTxResults {
+ struct TKiQueryBlock {
+ TVector<TExprBase> Results;
+ TVector<TExprBase> Effects;
+ THashSet<std::string_view> TablesWithEffects;
+ bool HasUncommittedChangesRead = false;
+ };
+
THashSet<const TExprNode*> Ops;
TVector<TExprBase> Sync;
- TVector<TExprBase> Results;
- TVector<TExprBase> Effects;
+ TVector<TKiQueryBlock> QueryBlocks;
TVector<TKiOperation> TableOperations;
bool HasExecute;
@@ -90,6 +96,44 @@ struct TKiExploreTxResults {
}
}
+ void AddEffect(const TExprBase& effect, std::string_view table) {
+ if (QueryBlocks.empty()) {
+ AddQueryBlock();
+ }
+
+ auto& curBlock = QueryBlocks.back();
+ curBlock.Effects.push_back(effect);
+ curBlock.TablesWithEffects.insert(table);
+ }
+
+ void AddResult(const TExprBase& result) {
+ if (QueryBlocks.empty()) {
+ AddQueryBlock();
+ }
+
+ auto& curBlock = QueryBlocks.back();
+ curBlock.Results.push_back(result);
+ }
+
+ bool HasEffects(std::string_view table) {
+ if (QueryBlocks.empty()) {
+ return false;
+ }
+
+ auto& curBlock = QueryBlocks.back();
+ return curBlock.TablesWithEffects.contains(table);
+ }
+
+ void AddQueryBlock() {
+ QueryBlocks.emplace_back();
+ }
+
+ void SetBlockHasUncommittedChangesRead() {
+ YQL_ENSURE(!QueryBlocks.empty());
+ auto& curBlock = QueryBlocks.back();
+ curBlock.HasUncommittedChangesRead = true;
+ }
+
TKiExploreTxResults()
: HasExecute(false) {}
};
@@ -133,6 +177,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx));
+ if (txRes.HasEffects(table)) {
+ txRes.AddQueryBlock();
+ txRes.SetBlockHasUncommittedChangesRead();
+ }
return result;
}
@@ -147,7 +195,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto result = ExploreTx(write.World(), ctx, dataSink, txRes);
auto tableOp = GetTableOp(write);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx));
- txRes.Effects.push_back(node);
+ txRes.AddEffect(node, table);
return result;
}
@@ -161,7 +209,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(update.World(), ctx, dataSink, txRes);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Update, update.Pos(), ctx));
- txRes.Effects.push_back(node);
+ txRes.AddEffect(node, table);
return result;
}
@@ -175,7 +223,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(del.World(), ctx, dataSink, txRes);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Delete, del.Pos(), ctx));
- txRes.Effects.push_back(node);
+ txRes.AddEffect(node, table);
return result;
}
@@ -327,7 +375,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
{
txRes.Ops.insert(node.Raw());
bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes);
- txRes.Results.push_back(node);
+ txRes.AddResult(node);
return result;
}
@@ -408,28 +456,42 @@ TExprNode::TPtr MakeSchemeTx(TCoCommit commit, TExprContext& ctx) {
}
TKiDataQuery MakeKiDataQuery(TExprBase node, const TKiExploreTxResults& txExplore, TExprContext& ctx) {
- TExprNode::TListType queryResults;
- for (auto& result : txExplore.Results) {
- auto resWrite = result.Cast<TResWriteBase>();
+ TVector<TKiDataQueryBlock> queryBlocks;
+ queryBlocks.reserve(txExplore.QueryBlocks.size());
- auto kiResult = Build<TKiResult>(ctx, node.Pos())
- .Value(resWrite.Data())
- .Columns(GetResultColumns(resWrite, ctx))
- .RowsLimit().Build(GetResultRowsLimit(resWrite))
- .Done();
+ for (const auto& block : txExplore.QueryBlocks) {
+ TKiDataQueryBlockSettings settings;
+ settings.HasUncommittedChangesRead = block.HasUncommittedChangesRead;
- queryResults.push_back(kiResult.Ptr());
+ TExprNode::TListType queryResults;
+ for (auto& result : block.Results) {
+ auto resWrite = result.Cast<TResWriteBase>();
+
+ auto kiResult = Build<TKiResult>(ctx, node.Pos())
+ .Value(resWrite.Data())
+ .Columns(GetResultColumns(resWrite, ctx))
+ .RowsLimit().Build(GetResultRowsLimit(resWrite))
+ .Done();
+
+ queryResults.push_back(kiResult.Ptr());
+ }
+ queryBlocks.emplace_back(Build<TKiDataQueryBlock>(ctx, node.Pos())
+ .Results()
+ .Add(queryResults)
+ .Build()
+ .Effects()
+ .Add(block.Effects)
+ .Build()
+ .Settings(settings.BuildNode(ctx, node.Pos()))
+ .Done());
}
auto query = Build<TKiDataQuery>(ctx, node.Pos())
.Operations()
.Add(txExplore.TableOperations)
.Build()
- .Results()
- .Add(queryResults)
- .Build()
- .Effects()
- .Add(txExplore.Effects)
+ .Blocks()
+ .Add(queryBlocks)
.Build()
.Done();
@@ -527,36 +589,44 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx) {
.Input(execQuery)
.Done();
- if (!txExplore.Results.empty()) {
+
+ bool hasResults = false;
+ for (const auto& block : txExplore.QueryBlocks) {
+ hasResults = hasResults || !block.Results.empty();
+ }
+
+ if (hasResults) {
auto execRight = Build<TCoRight>(ctx, node.Pos())
.Input(execQuery)
.Done()
.Ptr();
- for (size_t i = 0; i < txExplore.Results.size(); ++i) {
- auto result = txExplore.Results[i].Cast<TResWriteBase>();
-
- auto extractValue = Build<TCoNth>(ctx, node.Pos())
- .Tuple(execRight)
- .Index().Build(i)
- .Done()
- .Ptr();
-
- auto newResult = ctx.ChangeChild(
- *ctx.ChangeChild(
- result.Ref(),
- TResWriteBase::idx_World,
- execWorld.Ptr()
- ),
- TResWriteBase::idx_Data,
- std::move(extractValue)
- );
-
- execWorld = Build<TCoCommit>(ctx, node.Pos())
- .World(newResult)
- .DataSink<TResultDataSink>()
- .Build()
- .Done();
+ for (auto& block : txExplore.QueryBlocks) {
+ for (size_t i = 0; i < block.Results.size(); ++i) {
+ auto result = block.Results[i].Cast<TResWriteBase>();
+
+ auto extractValue = Build<TCoNth>(ctx, node.Pos())
+ .Tuple(execRight)
+ .Index().Build(i)
+ .Done()
+ .Ptr();
+
+ auto newResult = ctx.ChangeChild(
+ *ctx.ChangeChild(
+ result.Ref(),
+ TResWriteBase::idx_World,
+ execWorld.Ptr()
+ ),
+ TResWriteBase::idx_Data,
+ std::move(extractValue)
+ );
+
+ execWorld = Build<TCoCommit>(ctx, node.Pos())
+ .World(newResult)
+ .DataSink<TResultDataSink>()
+ .Build()
+ .Done();
+ }
}
}
@@ -586,9 +656,7 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
return node.Ptr();
}
- auto dataQuery = Build<TKiDataQuery>(ctx, node.Pos())
- .Operations()
- .Build()
+ auto queryBlock = Build<TKiDataQueryBlock>(ctx, node.Pos())
.Results()
.Add()
.Value(resFill.Data())
@@ -600,6 +668,14 @@ TExprNode::TPtr KiBuildResult(TExprBase node, const TString& cluster, TExprCont
.Build()
.Done();
+ auto dataQuery = Build<TKiDataQuery>(ctx, node.Pos())
+ .Operations()
+ .Build()
+ .Blocks()
+ .Add(queryBlock)
+ .Build()
+ .Done();
+
auto exec = Build<TKiExecDataQuery>(ctx, node.Pos())
.World(resFill.World())
.DataSink<TKiDataSink>()
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index 0e4f11db3f1..33f62431415 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -50,6 +50,7 @@ struct TKikimrData {
DataSinkNames.insert(TKiCreateGroup::CallableName());
DataSinkNames.insert(TKiAlterGroup::CallableName());
DataSinkNames.insert(TKiDropGroup::CallableName());
+ DataSinkNames.insert(TKiDataQueryBlock::CallableName());
DataSinkNames.insert(TKiDataQuery::CallableName());
DataSinkNames.insert(TKiExecDataQuery::CallableName());
DataSinkNames.insert(TKiEffects::CallableName());
@@ -571,6 +572,33 @@ bool AddDmlIssue(const TIssue& issue, bool strictDml, TExprContext& ctx) {
}
}
+TKiDataQueryBlockSettings TKiDataQueryBlockSettings::Parse(const NNodes::TKiDataQueryBlock& node) {
+ TKiDataQueryBlockSettings settings;
+
+ for (const auto& tuple : node.Settings()) {
+ auto name = tuple.Name().Value();
+ if (name == HasUncommittedChangesReadSettingName) {
+ settings.HasUncommittedChangesRead = true;
+ }
+ }
+
+ return settings;
+}
+
+NNodes::TCoNameValueTupleList TKiDataQueryBlockSettings::BuildNode(TExprContext& ctx, TPositionHandle pos) const {
+ TVector<TCoNameValueTuple> settings;
+
+ if (HasUncommittedChangesRead) {
+ settings.push_back(Build<TCoNameValueTuple>(ctx, pos)
+ .Name().Build(HasUncommittedChangesReadSettingName)
+ .Done());
+ }
+
+ return Build<TCoNameValueTupleList>(ctx, pos)
+ .Add(settings)
+ .Done();
+}
+
TKiExecDataQuerySettings TKiExecDataQuerySettings::Parse(TKiExecDataQuery exec) {
TKiExecDataQuerySettings settings;
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 396bdc12520..149eea729b8 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -319,7 +319,7 @@ public:
template<class IterableKqpTableOps, class IterableKqpTableInfos>
bool ApplyTableOperations(const IterableKqpTableOps& operations,
const IterableKqpTableInfos& tableInfos, NKikimrKqp::EIsolationLevel isolationLevel, bool strictDml,
- EKikimrQueryType queryType, TExprContext& ctx)
+ bool enableImmediateEffects, EKikimrQueryType queryType, TExprContext& ctx)
{
if (IsClosed()) {
TString message = TStringBuilder() << "Cannot perform operations on closed transaction.";
@@ -396,8 +396,8 @@ public:
}
auto& currentOps = TableOperations[table];
-
- if (currentOps & KikimrModifyOps()) {
+ bool currentModify = currentOps & KikimrModifyOps();
+ if (currentModify && !enableImmediateEffects) {
if (KikimrRequireUnmodifiedOps() & newOp) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' can't be performed on previously modified table: " << table;
@@ -430,7 +430,7 @@ public:
// TODO: KIKIMR-3206
bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn);
bool newUpdate = newOp == TYdbOperation::Update;
- if (currentDelete && newUpdate) {
+ if (currentDelete && newUpdate && !enableImmediateEffects) {
TString message = TStringBuilder() << "Operation '" << newOp
<< "' may lead to unexpected results when applied to table with deleted rows: " << table;
if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), strictDml, ctx)) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index ca775e2e1cf..2a0646bc7b0 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -44,6 +44,7 @@ private:
virtual TStatus HandleKql(NNodes::TCallable node, TExprContext& ctx) = 0;
virtual TStatus HandleExecDataQuery(NNodes::TKiExecDataQuery node, TExprContext& ctx) = 0;
virtual TStatus HandleDataQuery(NNodes::TKiDataQuery node, TExprContext& ctx) = 0;
+ virtual TStatus HandleDataQueryBlock(NNodes::TKiDataQueryBlock node, TExprContext& ctx) = 0;
virtual TStatus HandleEffects(NNodes::TKiEffects node, TExprContext& ctx) = 0;
};
@@ -96,6 +97,14 @@ private:
TMaybe<TString> View;
};
+struct TKiDataQueryBlockSettings {
+ static constexpr std::string_view HasUncommittedChangesReadSettingName = "has_uncommitted_changes_read"sv;
+ bool HasUncommittedChangesRead = false;
+
+ static TKiDataQueryBlockSettings Parse(const NNodes::TKiDataQueryBlock& node);
+ NNodes::TCoNameValueTupleList BuildNode(TExprContext& ctx, TPositionHandle pos) const;
+};
+
struct TKiExecDataQuerySettings {
TMaybe<TString> Mode;
TVector<NNodes::TCoNameValueTuple> Other;
diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
index 2be0482632b..76a9f4af183 100644
--- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp
@@ -1220,7 +1220,7 @@ private:
return TStatus::Ok;
}
- virtual TStatus HandleDataQuery(NNodes::TKiDataQuery node, TExprContext& ctx) override {
+ virtual TStatus HandleDataQueryBlock(NNodes::TKiDataQueryBlock node, TExprContext& ctx) override {
if (!EnsureWorldType(node.Effects().Ref(), ctx)) {
return TStatus::Error;
}
@@ -1252,6 +1252,23 @@ private:
return TStatus::Ok;
}
+ virtual TStatus HandleDataQuery(NNodes::TKiDataQuery node, TExprContext& ctx) override {
+ TTypeAnnotationNode::TListType resultTypes;
+ for (const auto& block : node.Blocks()) {
+ auto blockType = block.Ref().GetTypeAnn();
+ if (!EnsureTupleType(block.Pos(), *blockType, ctx)) {
+ return TStatus::Error;
+ }
+
+ for (const auto& resultType : blockType->Cast<TTupleExprType>()->GetItems()) {
+ resultTypes.push_back(resultType);
+ }
+ }
+
+ node.Ptr()->SetTypeAnn(ctx.MakeType<TTupleExprType>(resultTypes));
+ return TStatus::Ok;
+ }
+
virtual TStatus HandleExecDataQuery(NNodes::TKiExecDataQuery node, TExprContext& ctx) override {
if (!EnsureWorldType(node.World().Ref(), ctx)) {
return TStatus::Error;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 89535b80349..99cccc6589c 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -89,6 +89,10 @@ inline bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikim
}
}
+ if (config.FeatureFlags.GetEnableKqpImmediateEffects() && physicalQuery.GetHasUncommittedChangesRead()) {
+ return true;
+ }
+
// We don't want snapshot when there are effects at the moment,
// because it hurts performance when there are multiple single-shard
// reads and a single distributed commit. Taking snapshot costs
@@ -768,9 +772,11 @@ public:
std::pair<bool, TIssues> ApplyTableOperations(TKqpTransactionContext* txCtx, const NKqpProto::TKqpPhyQuery& query) {
auto isolationLevel = *txCtx->EffectiveIsolationLevel;
bool strictDml = Config->StrictDml.Get(Settings.Cluster).GetOrElse(false);
+ bool enableImmediateEffects = Config->FeatureFlags.GetEnableKqpImmediateEffects();
TExprContext ctx;
- bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, strictDml, EKikimrQueryType::Dml, ctx);
+ bool success = txCtx->ApplyTableOperations(query.GetTableOps(), query.GetTableInfos(), isolationLevel, strictDml,
+ enableImmediateEffects, EKikimrQueryType::Dml, ctx);
return {success, ctx.IssueManager.GetIssues()};
}
@@ -1086,30 +1092,37 @@ public:
std::shared_ptr<const NKqpProto::TKqpPhyTx> tx;
if (QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) {
- tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>( QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx));
+ tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx));
}
- while (tx && tx->GetHasEffects()) {
- if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) {
- ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST)
- << "Failed to mix queries with old- and new- engines";
- }
- LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
+ if (!Config->FeatureFlags.GetEnableKqpImmediateEffects()) {
+ while (tx && tx->GetHasEffects()) {
+ YQL_ENSURE(txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx)));
+ LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
- if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
- ++QueryState->CurrentTx;
- tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
- &phyQuery.GetTransactions(QueryState->CurrentTx));
- } else {
- tx = nullptr;
- break;
+ if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
+ ++QueryState->CurrentTx;
+ tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
+ &phyQuery.GetTransactions(QueryState->CurrentTx));
+ } else {
+ tx = nullptr;
+ break;
+ }
}
}
+
if (!CheckTransacionLocks() || !CheckTopicOperations()) {
return;
}
- bool commit = QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1;
+ bool commit = false;
+ if (QueryState->Commit && Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) {
+ // every phy tx should acquire LockTxId, so commit is sent separately at the end
+ commit = QueryState->CurrentTx >= phyQuery.TransactionsSize();
+ } else if (QueryState->Commit) {
+ commit = QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1;
+ }
+
if (tx || commit) {
bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit);
if (!replied) {
@@ -1164,10 +1177,11 @@ public:
}
request.Transactions.emplace_back(tx, PrepareParameters(*tx));
+ txCtx.HasImmediateEffects = txCtx.HasImmediateEffects || tx->GetHasEffects();
} else {
YQL_ENSURE(commit);
- if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) {
+ if (!txCtx.TxHasEffects() && !txCtx.Locks.HasLocks() && !txCtx.TopicOperations.HasOperations()) {
ReplySuccess();
return true;
}
@@ -1191,7 +1205,7 @@ public:
}
if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasReadOperations()) {
- request.ValidateLocks = !(txCtx.GetSnapshot().IsValid() && txCtx.DeferredEffects.Empty()) ||
+ request.ValidateLocks = !txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() ||
txCtx.TopicOperations.HasReadOperations();
request.EraseLocks = true;
@@ -1262,7 +1276,7 @@ public:
const auto& locks = result.GetLocks();
auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx);
if (!success) {
- if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) {
+ if (!txCtx->GetSnapshot().IsValid() || txCtx->TxHasEffects()) {
ReplyQueryError(Ydb::StatusIds::ABORTED, "Error while locks merge",
MessageFromIssues(issues));
return false;
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index 73340514207..c253e526ae2 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -89,7 +89,7 @@ bool MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue&
return false;
} else {
txCtx.Locks.MarkBroken(issues.back());
- if (!txCtx.DeferredEffects.Empty()) {
+ if (txCtx.TxHasEffects()) {
txCtx.Locks.ReportIssues(ctx);
return false;
}
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index e4bf301cec0..913ade9e977 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -134,6 +134,10 @@ public:
return DeferredEffects.Add(std::move(physicalTx), std::move(params));
}
+ bool TxHasEffects() const {
+ return HasImmediateEffects || !DeferredEffects.Empty();
+ }
+
const IKqpGateway::TKqpSnapshot& GetSnapshot() const {
return SnapshotHandle.Snapshot;
}
@@ -171,6 +175,7 @@ public:
DeferredEffects.Clear();
ParamsState = MakeIntrusive<TParamsState>();
SnapshotHandle.Snapshot = IKqpGateway::TKqpSnapshot::InvalidSnapshot;
+ HasImmediateEffects = false;
}
TKqpTransactionInfo GetInfo() const;
@@ -225,6 +230,7 @@ public:
TKqpTxLocks Locks;
TDeferredEffects DeferredEffects;
+ bool HasImmediateEffects = false;
NTopic::TTopicOperations TopicOperations;
TIntrusivePtr<TParamsState> ParamsState;
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index dd327e67366..bf02c9e80d2 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -99,6 +99,16 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
ServerSettings->SetFrFactory(&UdfFrFactory);
ServerSettings->SetEnableNotNullColumns(true);
ServerSettings->SetEnableMoveIndex(true);
+
+ if (settings.FeatureFlags.GetEnableKqpImmediateEffects()) {
+ Tests::TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ ServerSettings->SetControls(controls);
+ }
+
if (settings.LogStream)
ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream));
diff --git a/ydb/core/kqp/ut/kqp_effects_ut.cpp b/ydb/core/kqp/ut/kqp_effects_ut.cpp
index 445136b7014..493699ed3c8 100644
--- a/ydb/core/kqp/ut/kqp_effects_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_effects_ut.cpp
@@ -512,6 +512,227 @@ Y_UNIT_TEST_SUITE(KqpEffects) {
UNIT_ASSERT_VALUES_EQUAL(reads[0]["type"], "Lookup");
UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3);
}
+
+ Y_UNIT_TEST(ImmediateInsert) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+
+ CREATE TABLE `/Root/TestImmediateInsert` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES
+ (1u, "One"),
+ (2u, "Two");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM `/Root/TestImmediateInsert`;
+ INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES
+ (3u, "Three"),
+ (4u, "Four");
+
+ SELECT * FROM `/Root/TestImmediateInsert`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["Three"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES (5u, "Five");
+ INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES (6u, "Six");
+ INSERT INTO `/Root/TestImmediateInsert` (Key, Value) VALUES (7u, "Seven");
+
+ SELECT * FROM `/Root/TestImmediateInsert`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["Three"]];
+ [[4u];["Four"]];
+ [[5u];["Five"]];
+ [[6u];["Six"]];
+ [[7u];["Seven"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ImmediateUpdate) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+
+ CREATE TABLE `/Root/TestImmediateUpdate` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO `/Root/TestImmediateUpdate` (Key, Value) VALUES
+ (1u, "One"),
+ (2u, "Two");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM `/Root/TestImmediateUpdate`;
+ UPDATE `/Root/TestImmediateUpdate` ON (Key, Value) VALUES
+ (1u, "Updated1"),
+ (2u, "Updated2");
+
+ SELECT * FROM `/Root/TestImmediateUpdate`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([
+ [[1u];["Updated1"]];
+ [[2u];["Updated2"]]
+ ])", FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPDATE `/Root/TestImmediateUpdate` ON (Key, Value) VALUES
+ (1u, "Updated3"),
+ (2u, "Updated4");
+
+ UPDATE `/Root/TestImmediateUpdate` ON (Key, Value) VALUES
+ (1u, "Updated5");
+
+ SELECT * FROM `/Root/TestImmediateUpdate`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["Updated5"]];
+ [[2u];["Updated4"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(ImmediateDelete) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ AssertSuccessResult(session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+
+ CREATE TABLE `/Root/TestImmediateDelete` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync());
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO `/Root/TestImmediateDelete` (Key, Value) VALUES
+ (1u, "One"),
+ (2u, "Two"),
+ (3u, "Three"),
+ (4u, "Four");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM `/Root/TestImmediateDelete`;
+ DELETE FROM `/Root/TestImmediateDelete` WHERE Key = 4;
+
+ SELECT * FROM `/Root/TestImmediateDelete`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["Three"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["Three"]]
+ ])", FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ {
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ DELETE FROM `/Root/TestImmediateDelete` WHERE Key > 2;
+ DELETE FROM `/Root/TestImmediateDelete` WHERE Key < 2;
+
+ SELECT * FROM `/Root/TestImmediateDelete`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[2u];["Two"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 60e7fe2487d..ad2cf7f7643 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -747,6 +747,7 @@ message TFeatureFlags {
optional bool EnableChangefeedInitialScan = 77 [default = false];
optional bool EnableKqpScanQuerySourceRead = 78 [default = false];
optional bool EnableDynamicNodeAuthorization = 79 [default = false];
+ optional bool EnableKqpImmediateEffects = 80 [default = false];
}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 54e6603fd28..c049b2cd9ab 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -337,4 +337,5 @@ message TKqpPhyQuery {
// StrictDml constraints info
repeated TKqpTableOp TableOps = 7;
repeated TKqpTableInfo TableInfos = 8;
+ bool HasUncommittedChangesRead = 9;
}
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 9e57bfa874a..85a086c269a 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -41,6 +41,7 @@ public:
FEATURE_FLAG_SETTER(EnableArrowFormatAtDatashard)
FEATURE_FLAG_SETTER(EnableGrpcAudit)
FEATURE_FLAG_SETTER(EnableChangefeedInitialScan)
+ FEATURE_FLAG_SETTER(EnableKqpImmediateEffects)
TDerived& SetEnableMvcc(std::optional<bool> value) {
if (value) {