summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <[email protected]>2022-03-05 11:54:35 +0300
committerIgor Makunin <[email protected]>2022-03-05 11:54:35 +0300
commitd2cc973b3c4479746b818ca83fc1578d75bf75e7 (patch)
treec8f7ea061fe1611089fb17cbf50c3a58a283ea43
parent764d7eeeac4d68d0c28028efd1bc1969c6fe08fd (diff)
KIKIMR-14379: more safe inpace update
ref:097aed9d5be04a5a92eaa3c1e69ca28ff8685dde
-rw-r--r--ydb/core/kqp/opt/kqp_opt_effects.cpp170
-rw-r--r--ydb/core/kqp/ut/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp421
-rw-r--r--ydb/core/kqp/ut/kqp_newengine_ut.cpp43
-rw-r--r--ydb/core/kqp/ut/ya.make1
5 files changed, 582 insertions, 54 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp
index f5f2401ce9f..48fe4039bf6 100644
--- a/ydb/core/kqp/opt/kqp_opt_effects.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp
@@ -24,18 +24,110 @@ bool InplaceUpdateEnabled(const TKikimrConfiguration& config) {
return true;
}
-bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input) {
- // TODO: Check for non-deterministic & unsafe functions (like UDF).
- // TODO: Once we have partitioning constraints implemented in query optimizer,
- // use them to detect map writes.
- if (!input.Maybe<TCoFlatMap>().Input().Maybe<TKqlReadTableBase>()) {
+bool IsSingleKeyStream(const TExprBase& stream, TExprContext&) {
+ auto asList = stream.Maybe<TCoIterator>().List().Maybe<TCoAsList>();
+ if (!asList) {
return false;
}
- auto flatmap = input.Cast<TCoFlatMap>();
- auto read = flatmap.Input().Cast<TKqlReadTableBase>();
+ if (asList.Cast().ArgCount() > 1) {
+ return false;
+ }
+
+ auto asStruct = asList.Cast().Arg(0).Maybe<TCoAsStruct>();
+ if (!asStruct) {
+ return false;
+ }
+
+ return true;
+}
+
+const THashSet<TStringBuf> SafeCallables {
+ TCoJust::CallableName(),
+ TCoCoalesce::CallableName(),
+ TCoToOptional::CallableName(),
+ TCoHead::CallableName(),
+ TCoLast::CallableName(),
+ TCoNth::CallableName(),
+ TCoToList::CallableName(),
+ TCoAsList::CallableName(),
+
+ TCoMember::CallableName(),
+ TCoAsStruct::CallableName(),
+
+ TCoNothing::CallableName(),
+ TCoNull::CallableName(),
+ TCoDefault::CallableName(),
+ TCoExists::CallableName(),
+
+ TCoIf::CallableName(),
+
+ TCoDataType::CallableName(),
+ TCoOptionalType::CallableName(),
+
+ TCoParameter::CallableName(),
+
+ "Concat",
+ "Substring",
+};
+
+bool IsStructOrOptionalStruct(const NYql::TTypeAnnotationNode* type) {
+ if (type->GetKind() == ETypeAnnotationKind::Struct) {
+ return true;
+ }
+
+ if (type->GetKind() == ETypeAnnotationKind::Optional) {
+ return type->Cast<TOptionalExprType>()->GetItemType()->GetKind() == ETypeAnnotationKind::Struct;
+ }
+
+ return false;
+}
+
+bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input, TExprContext& ctx) {
+// #define DBG YQL_CLOG(ERROR, ProviderKqp)
+#define DBG TStringBuilder()
+
+ DBG << "--> " << KqpExprToPrettyString(input, ctx);
+
+ auto maybeFlatMap = input.Maybe<TCoFlatMap>();
+ if (!maybeFlatMap) {
+ return false;
+ }
+ auto flatmap = maybeFlatMap.Cast();
+
+ if (!IsStructOrOptionalStruct(flatmap.Lambda().Ref().GetTypeAnn())) {
+ DBG << " --> FlatMap with expanding lambda: " << *flatmap.Lambda().Ref().GetTypeAnn();
+ return false;
+ }
+
+ auto maybeLookupTable = flatmap.Input().Maybe<TKqpLookupTable>();
+ if (!maybeLookupTable) {
+ maybeLookupTable = flatmap.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqpLookupTable>();
+ }
+
+ if (!maybeLookupTable) {
+ DBG << " --> not FlatMap over KqpLookupTable";
+ return false;
+ }
+
+ auto read = maybeLookupTable.Cast();
+ // check same table
if (table.Metadata->PathId.ToString() != read.Table().PathId().Value()) {
+ DBG << " --> not same table";
+ return false;
+ }
+
+ // check keys count
+ if (!IsSingleKeyStream(read.LookupKeys(), ctx)) {
+ DBG << " --> not single key stream";
+ return false;
+ }
+
+ // full key (not prefix)
+ auto* lookupKeyType = GetSeqItemType(read.LookupKeys().Ref().GetTypeAnn());
+ if (table.Metadata->KeyColumnNames.size() != lookupKeyType->Cast<TStructExprType>()->GetSize()) {
+ DBG << " --> not full key";
return false;
}
@@ -52,7 +144,62 @@ bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input) {
}
}
- return true;
+ auto lambda = flatmap.Lambda();
+ if (!lambda.Ref().IsComplete()) {
+ return false;
+ }
+
+ TMaybeNode<TExprBase> notSafeNode;
+ // white list of callables in lambda
+ VisitExpr(lambda.Body().Ptr(),
+ [&notSafeNode] (const TExprNode::TPtr&) {
+ return !notSafeNode;
+ },
+ [&notSafeNode](const TExprNode::TPtr& node) {
+ if (notSafeNode) {
+ return false;
+ }
+ if (node->IsCallable()) {
+ DBG << " --> visit: " << node->Content();
+
+ auto expr = TExprBase(node);
+
+ if (expr.Maybe<TCoDataCtor>()) {
+ return true;
+ }
+ if (expr.Maybe<TCoCompare>()) {
+ return true;
+ }
+ if (expr.Maybe<TCoAnd>()) {
+ return true;
+ }
+ if (expr.Maybe<TCoOr>()) {
+ return true;
+ }
+ if (expr.Maybe<TCoBinaryArithmetic>()) {
+ return true;
+ }
+ if (expr.Maybe<TCoCountBase>()) {
+ return true;
+ }
+
+ if (SafeCallables.contains(node->Content())) {
+ return true;
+ }
+
+ // TODO: allowed UDFs
+
+ notSafeNode = expr;
+ DBG << " --> not safe node: " << node->Content();
+ return false;
+ }
+
+ return true;
+ });
+
+ return !notSafeNode;
+
+#undef DBG
}
TDqPhyPrecompute BuildPrecomputeStage(TExprBase expr, TExprContext& ctx) {
@@ -108,9 +255,10 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const
auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, node.Table().Path());
auto dqUnion = node.Input().Cast<TDqCnUnionAll>();
- auto input = dqUnion.Output().Stage().Program().Body();
+ auto program = dqUnion.Output().Stage().Program();
+ auto input = program.Body();
- if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input)) {
+ if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input, ctx)) {
stageInput = Build<TKqpCnMapShard>(ctx, node.Pos())
.Output()
.Stage(dqUnion.Output().Stage())
@@ -171,7 +319,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const
auto dqUnion = node.Input().Cast<TDqCnUnionAll>();
auto input = dqUnion.Output().Stage().Program().Body();
- if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input)) {
+ if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input, ctx)) {
stageInput = Build<TKqpCnMapShard>(ctx, node.Pos())
.Output()
.Stage(dqUnion.Output().Stage())
diff --git a/ydb/core/kqp/ut/CMakeLists.txt b/ydb/core/kqp/ut/CMakeLists.txt
index 7de5aeda44e..42606bd4403 100644
--- a/ydb/core/kqp/ut/CMakeLists.txt
+++ b/ydb/core/kqp/ut/CMakeLists.txt
@@ -45,6 +45,7 @@ target_sources(ydb-core-kqp-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_locks_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_merge_connection_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_mvcc_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_effects_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_flowcontrol_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_ut.cpp
diff --git a/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp b/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp
new file mode 100644
index 00000000000..79a8839c596
--- /dev/null
+++ b/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp
@@ -0,0 +1,421 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+Y_UNIT_TEST_SUITE(KqpNewEngineInplaceUpdate) {
+
+void PrepareTable(TSession& session) {
+ auto ret = session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/InplaceUpdate` (
+ Key Uint64,
+ ValueStr String,
+ ValueInt Uint64,
+ ValueDbl Double,
+ PRIMARY KEY(Key)
+ ) WITH (
+ PARTITION_AT_KEYS = (10)
+ )
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(ret.GetStatus(), EStatus::SUCCESS, ret.GetIssues().ToString());
+
+ auto result = session.ExecuteDataQuery(R"(
+ UPSERT INTO `/Root/InplaceUpdate` (Key, ValueStr, ValueInt, ValueDbl) VALUES
+ (1, "One", 100, 101.0),
+ (20, "Two", 200, 202.0)
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+}
+
+void Test(bool enableInplaceUpdate, const TString& query, TParams&& params, const TString& expectedResult,
+ std::function<void(const Ydb::TableStats::QueryStats&)>&& check)
+{
+ TKikimrRunner kikimr;
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+
+ PrepareTable(session);
+
+ TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ auto q = TStringBuilder()
+ << R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = 'true';
+ PRAGMA kikimr.OptEnableInplaceUpdate = ')" << (enableInplaceUpdate ? "true" : "false") << "';" << Endl
+ << query;
+
+ auto result = session.ExecuteDataQuery(q, TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ check(NYdb::TProtoAccessor::GetProto(*result.GetStats()));
+
+ result = session.ExecuteDataQuery(R"(
+ PRAGMA kikimr.UseNewEngine = 'true';
+ SELECT Key, ValueStr, ValueInt, ValueDbl FROM `/Root/InplaceUpdate` ORDER BY Key;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ CompareYson(expectedResult, FormatResultSetYson(result.GetResultSet(0)));
+}
+
+#define ASSERT_LITERAL_PHASE(stats, phaseNo) \
+ UNIT_ASSERT_C(stats.query_phases(phaseNo).table_access().empty(), stats.DebugString());
+
+#define ASSERT_PHASE(stats, phaseNo, table, readsCnt, updatesCnt) \
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access().size(), 1, stats.DebugString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).name(), table, stats.DebugString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).reads().rows(), readsCnt, stats.DebugString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).updates().rows(), updatesCnt, stats.DebugString()); \
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).partitions_count(), std::max(readsCnt, updatesCnt), stats.DebugString());
+
+
+Y_UNIT_TEST_TWIN(SingleRowSimple, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint64;
+ DECLARE $value AS String;
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueStr = $value
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint64(1).Build()
+ .AddParam("$value").String("updated").Build()
+ .Build(),
+ R"([
+ [[1u];["updated"];[100u];[101.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ if constexpr (EnableInplaceUpdate) {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1);
+ }
+ });
+}
+
+Y_UNIT_TEST_TWIN(SingleRowStr, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint64;
+ DECLARE $value AS String;
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueStr = Substring(ValueStr || $value, 1)
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint64(1).Build()
+ .AddParam("$value").String("updated").Build()
+ .Build(),
+ R"([
+ [[1u];["neupdated"];[100u];[101.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ if constexpr (EnableInplaceUpdate) {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1);
+ }
+ });
+}
+
+Y_UNIT_TEST_TWIN(SingleRowArithm, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint64;
+ DECLARE $x AS Uint64;
+ DECLARE $y AS Double;
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueInt = (ValueInt + $x) * ($x + 1ul),
+ ValueDbl = (ValueDbl - $y) / ($y + 1.)
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint64(1).Build()
+ .AddParam("$x").Uint64(10).Build()
+ .AddParam("$y").Double(5.0).Build()
+ .Build(),
+ R"([
+ [[1u];["One"];[1210u];[16.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ if constexpr (EnableInplaceUpdate) {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1);
+ }
+ });
+}
+
+Y_UNIT_TEST_TWIN(SingleRowIf, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint64;
+
+ $trim = ($v, $min, $max) -> {
+ RETURN CASE
+ WHEN $v > $max THEN $max
+ WHEN $v < $min THEN $min
+ ELSE $v
+ END;
+ };
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueInt = $trim(ValueInt, 0ul, 10ul) + 1ul,
+ ValueDbl = $trim(ValueDbl, 0., 10.) / 10.0
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint64(1).Build()
+ .Build(),
+ R"([
+ [[1u];["One"];[11u];[1.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ if constexpr (EnableInplaceUpdate) {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1);
+ }
+ });
+}
+
+// allow multiple keys in KqpLookupTable to enable this test
+Y_UNIT_TEST_TWIN(Negative_SingleRowWithKeyCast, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint32; -- not Uint64
+ DECLARE $value AS String;
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueStr = $value
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint32(1).Build()
+ .AddParam("$value").String("updated").Build()
+ .Build(),
+ R"([
+ [[1u];["updated"];[100u];[101.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ // if constexpr (EnableInplaceUpdate) {
+ // UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ // ASSERT_LITERAL_PHASE(stats, 0);
+ // ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 1, 1);
+ // } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 3, stats.DebugString());
+ ASSERT_LITERAL_PHASE(stats, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 2, "/Root/InplaceUpdate", 0, 1);
+ // }
+ });
+}
+
+Y_UNIT_TEST_TWIN(Negative_SingleRowWithValueCast, EnableInplaceUpdate) {
+/*
+ (
+ (declare $key (DataType 'Uint64))
+ (declare $value (DataType 'Int32))
+ (let $1 (KqpTable '"/Root/InplaceUpdate" '"72057594046644480:11" '"" '1))
+ (let $2 (DataType 'Uint64))
+ (let $3 (KqpLookupTable $1 (Iterator (AsList (AsStruct '('"Key" $key)))) '('"Key")))
+ (return (FlatMap $3 (lambda '($4) (Just (AsStruct '('"Key" (Member $4 '"Key")) '('"ValueInt" (Just (Convert $value $2))))))))
+ )
+
+ `Convert` is not safe callable, so there is no InplaceUpdate optimization here
+*/
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint64;
+ DECLARE $value AS Int32; -- not Uint64
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueInt = $value
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint64(1).Build()
+ .AddParam("$value").Int32(1).Build()
+ .Build(),
+ R"([
+ [[1u];["One"];[1u];[101.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1);
+ });
+}
+
+Y_UNIT_TEST_TWIN(Negative_SingleRowListFromRange, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key AS Uint64;
+
+ $foo = ($x) -> {
+ $list = ListFromRange(1, 10);
+ RETURN $x || ListConcat(Cast($list as List<String>), "..");
+ };
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueStr = $foo(ValueStr)
+ WHERE Key = $key
+ )",
+ TParamsBuilder()
+ .AddParam("$key").Uint64(1).Build()
+ .Build(),
+ R"([
+ [[1u];["One1..2..3..4..5..6..7..8..9"];[100u];[101.]];
+ [[20u];["Two"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString());
+ ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0);
+ ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1);
+ });
+}
+
+// allow multiple keys in KqpLookupTable to enable this test
+Y_UNIT_TEST_TWIN(Negative_BatchUpdate, EnableInplaceUpdate) {
+ Test(
+ EnableInplaceUpdate,
+ R"( DECLARE $key1 AS Uint64;
+ DECLARE $value1 AS String;
+ DECLARE $key2 AS Uint64;
+ DECLARE $value2 AS String;
+
+ $foo = ($x, $k1, $v1, $v2) -> {
+ RETURN CASE $x
+ WHEN $k1 THEN $v1
+ ELSE $v2
+ END;
+ };
+
+ UPDATE `/Root/InplaceUpdate` SET
+ ValueStr = $foo(Key, $key1, $value1, $value2)
+ WHERE Key IN [$key1, $key2]
+ )",
+ TParamsBuilder()
+ .AddParam("$key1").Uint64(1).Build()
+ .AddParam("$value1").String("updated-1").Build()
+ .AddParam("$key2").Uint64(20).Build()
+ .AddParam("$value2").String("updated-2").Build()
+ .Build(),
+ R"([
+ [[1u];["updated-1"];[100u];[101.]];
+ [[20u];["updated-2"];[200u];[202.]]
+ ])",
+ [](const Ydb::TableStats::QueryStats& stats) {
+ // if constexpr (EnableInplaceUpdate) {
+ // UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 3, stats.DebugString());
+ // ASSERT_LITERAL_PHASE(stats, 0);
+ // ASSERT_LITERAL_PHASE(stats, 1);
+ // ASSERT_PHASE(stats, 2, "/Root/InplaceUpdate", 2, 2);
+ // } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 4, stats.DebugString());
+ ASSERT_LITERAL_PHASE(stats, 0);
+ ASSERT_LITERAL_PHASE(stats, 1);
+ ASSERT_PHASE(stats, 2, "/Root/InplaceUpdate", 2, 0);
+ ASSERT_PHASE(stats, 3, "/Root/InplaceUpdate", 0, 2);
+ // }
+ });
+}
+
+Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
+ auto keysLimitSetting = NKikimrKqp::TKqpSetting();
+ keysLimitSetting.SetName("_CommitPerShardKeysSizeLimitBytes");
+ keysLimitSetting.SetValue("100");
+
+ TKikimrRunner kikimr({keysLimitSetting});
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ UNIT_ASSERT(session.ExecuteSchemeQuery(R"(
+ CREATE TABLE `/Root/Temp` (
+ Key Uint32,
+ Value1 String,
+ Value2 String,
+ PRIMARY KEY (Key)
+ );
+ )").GetValueSync().IsSuccess());
+
+ auto result = session.ExecuteDataQuery(R"(
+ REPLACE INTO `/Root/Temp` (Key, Value1) VALUES
+ (1u, "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"),
+ (3u, "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto query = Sprintf(R"(
+ PRAGMA kikimr.UseNewEngine = 'true';
+ PRAGMA kikimr.OptEnableInplaceUpdate = '%s';
+
+ DECLARE $Key AS Uint32;
+
+ UPDATE `/Root/Temp` SET
+ Value2 = Value1
+ WHERE Key = $Key
+ )", EnableInplaceUpdate ? "true" : "false");
+
+ auto params = db.GetParamsBuilder()
+ .AddParam("$Key").Uint32(1).Build()
+ .Build();
+
+ result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params).ExtractValueSync();
+
+ if constexpr (EnableInplaceUpdate) {
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST);
+ UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue) {
+ return issue.Message.Contains("READ_SIZE_EXECEEDED");
+ }));
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ result = session.ExecuteDataQuery(R"(
+ SELECT Value2 FROM `/Root/Temp` ORDER BY Value2;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ if constexpr (EnableInplaceUpdate) {
+ CompareYson(R"([[#];[#]])", FormatResultSetYson(result.GetResultSet(0)));
+ } else {
+ CompareYson(R"([
+ [#];[["123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+}
+
+} // suite
+
+} // namespace NKqp
+} // namespace NKikimr \ No newline at end of file
diff --git a/ydb/core/kqp/ut/kqp_newengine_ut.cpp b/ydb/core/kqp/ut/kqp_newengine_ut.cpp
index 4e99bea83d2..1043baef5e8 100644
--- a/ydb/core/kqp/ut/kqp_newengine_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_newengine_ut.cpp
@@ -1592,49 +1592,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
])", FormatResultSetYson(result.GetResultSet(0)));
}
- Y_UNIT_TEST(InplaceUpdate) {
- TKikimrRunner kikimr;
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
-
- NYdb::NTable::TExecDataQuerySettings execSettings;
- execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
-
- auto result = session.ExecuteDataQuery(R"(
- PRAGMA kikimr.UseNewEngine = "true";
- PRAGMA kikimr.OptEnableInplaceUpdate = "true";
-
- UPDATE [/Root/TwoShard]
- SET Value1 = "Updated"
- WHERE Value2 = 1;
- )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
- auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
-
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/TwoShard");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 2);
- UNIT_ASSERT(stats.query_phases(0).table_access(0).updates().bytes() > 0);
- UNIT_ASSERT(stats.query_phases(0).duration_us() > 0);
-
- result = session.ExecuteDataQuery(R"(
- PRAGMA kikimr.UseNewEngine = "true";
- SELECT * FROM [/Root/TwoShard] ORDER BY Key;
- )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
- CompareYson(R"([
- [[1u];["One"];[-1]];
- [[2u];["Two"];[0]];
- [[3u];["Updated"];[1]];
- [[4000000001u];["BigOne"];[-1]];
- [[4000000002u];["BigTwo"];[0]];
- [[4000000003u];["Updated"];[1]]
- ])", FormatResultSetYson(result.GetResultSet(0)));
- }
-
Y_UNIT_TEST(Delete) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make
index 609ec3237f8..5aa9abf952a 100644
--- a/ydb/core/kqp/ut/ya.make
+++ b/ydb/core/kqp/ut/ya.make
@@ -33,6 +33,7 @@ SRCS(
kqp_locks_ut.cpp
kqp_merge_connection_ut.cpp
kqp_mvcc_ut.cpp
+ kqp_newengine_inplace_update_ut.cpp
kqp_newengine_effects_ut.cpp
kqp_newengine_flowcontrol_ut.cpp
kqp_newengine_ut.cpp