diff options
author | Igor Makunin <[email protected]> | 2022-03-05 11:54:35 +0300 |
---|---|---|
committer | Igor Makunin <[email protected]> | 2022-03-05 11:54:35 +0300 |
commit | d2cc973b3c4479746b818ca83fc1578d75bf75e7 (patch) | |
tree | c8f7ea061fe1611089fb17cbf50c3a58a283ea43 | |
parent | 764d7eeeac4d68d0c28028efd1bc1969c6fe08fd (diff) |
KIKIMR-14379: more safe inpace update
ref:097aed9d5be04a5a92eaa3c1e69ca28ff8685dde
-rw-r--r-- | ydb/core/kqp/opt/kqp_opt_effects.cpp | 170 | ||||
-rw-r--r-- | ydb/core/kqp/ut/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp | 421 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_newengine_ut.cpp | 43 | ||||
-rw-r--r-- | ydb/core/kqp/ut/ya.make | 1 |
5 files changed, 582 insertions, 54 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index f5f2401ce9f..48fe4039bf6 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -24,18 +24,110 @@ bool InplaceUpdateEnabled(const TKikimrConfiguration& config) { return true; } -bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input) { - // TODO: Check for non-deterministic & unsafe functions (like UDF). - // TODO: Once we have partitioning constraints implemented in query optimizer, - // use them to detect map writes. - if (!input.Maybe<TCoFlatMap>().Input().Maybe<TKqlReadTableBase>()) { +bool IsSingleKeyStream(const TExprBase& stream, TExprContext&) { + auto asList = stream.Maybe<TCoIterator>().List().Maybe<TCoAsList>(); + if (!asList) { return false; } - auto flatmap = input.Cast<TCoFlatMap>(); - auto read = flatmap.Input().Cast<TKqlReadTableBase>(); + if (asList.Cast().ArgCount() > 1) { + return false; + } + + auto asStruct = asList.Cast().Arg(0).Maybe<TCoAsStruct>(); + if (!asStruct) { + return false; + } + + return true; +} + +const THashSet<TStringBuf> SafeCallables { + TCoJust::CallableName(), + TCoCoalesce::CallableName(), + TCoToOptional::CallableName(), + TCoHead::CallableName(), + TCoLast::CallableName(), + TCoNth::CallableName(), + TCoToList::CallableName(), + TCoAsList::CallableName(), + + TCoMember::CallableName(), + TCoAsStruct::CallableName(), + + TCoNothing::CallableName(), + TCoNull::CallableName(), + TCoDefault::CallableName(), + TCoExists::CallableName(), + + TCoIf::CallableName(), + + TCoDataType::CallableName(), + TCoOptionalType::CallableName(), + + TCoParameter::CallableName(), + + "Concat", + "Substring", +}; + +bool IsStructOrOptionalStruct(const NYql::TTypeAnnotationNode* type) { + if (type->GetKind() == ETypeAnnotationKind::Struct) { + return true; + } + + if (type->GetKind() == ETypeAnnotationKind::Optional) { + return type->Cast<TOptionalExprType>()->GetItemType()->GetKind() == ETypeAnnotationKind::Struct; + } + + return false; +} + +bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input, TExprContext& ctx) { +// #define DBG YQL_CLOG(ERROR, ProviderKqp) +#define DBG TStringBuilder() + + DBG << "--> " << KqpExprToPrettyString(input, ctx); + + auto maybeFlatMap = input.Maybe<TCoFlatMap>(); + if (!maybeFlatMap) { + return false; + } + auto flatmap = maybeFlatMap.Cast(); + + if (!IsStructOrOptionalStruct(flatmap.Lambda().Ref().GetTypeAnn())) { + DBG << " --> FlatMap with expanding lambda: " << *flatmap.Lambda().Ref().GetTypeAnn(); + return false; + } + + auto maybeLookupTable = flatmap.Input().Maybe<TKqpLookupTable>(); + if (!maybeLookupTable) { + maybeLookupTable = flatmap.Input().Maybe<TCoSkipNullMembers>().Input().Maybe<TKqpLookupTable>(); + } + + if (!maybeLookupTable) { + DBG << " --> not FlatMap over KqpLookupTable"; + return false; + } + + auto read = maybeLookupTable.Cast(); + // check same table if (table.Metadata->PathId.ToString() != read.Table().PathId().Value()) { + DBG << " --> not same table"; + return false; + } + + // check keys count + if (!IsSingleKeyStream(read.LookupKeys(), ctx)) { + DBG << " --> not single key stream"; + return false; + } + + // full key (not prefix) + auto* lookupKeyType = GetSeqItemType(read.LookupKeys().Ref().GetTypeAnn()); + if (table.Metadata->KeyColumnNames.size() != lookupKeyType->Cast<TStructExprType>()->GetSize()) { + DBG << " --> not full key"; return false; } @@ -52,7 +144,62 @@ bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input) { } } - return true; + auto lambda = flatmap.Lambda(); + if (!lambda.Ref().IsComplete()) { + return false; + } + + TMaybeNode<TExprBase> notSafeNode; + // white list of callables in lambda + VisitExpr(lambda.Body().Ptr(), + [¬SafeNode] (const TExprNode::TPtr&) { + return !notSafeNode; + }, + [¬SafeNode](const TExprNode::TPtr& node) { + if (notSafeNode) { + return false; + } + if (node->IsCallable()) { + DBG << " --> visit: " << node->Content(); + + auto expr = TExprBase(node); + + if (expr.Maybe<TCoDataCtor>()) { + return true; + } + if (expr.Maybe<TCoCompare>()) { + return true; + } + if (expr.Maybe<TCoAnd>()) { + return true; + } + if (expr.Maybe<TCoOr>()) { + return true; + } + if (expr.Maybe<TCoBinaryArithmetic>()) { + return true; + } + if (expr.Maybe<TCoCountBase>()) { + return true; + } + + if (SafeCallables.contains(node->Content())) { + return true; + } + + // TODO: allowed UDFs + + notSafeNode = expr; + DBG << " --> not safe node: " << node->Content(); + return false; + } + + return true; + }); + + return !notSafeNode; + +#undef DBG } TDqPhyPrecompute BuildPrecomputeStage(TExprBase expr, TExprContext& ctx) { @@ -108,9 +255,10 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const auto& table = kqpCtx.Tables->ExistingTable(kqpCtx.Cluster, node.Table().Path()); auto dqUnion = node.Input().Cast<TDqCnUnionAll>(); - auto input = dqUnion.Output().Stage().Program().Body(); + auto program = dqUnion.Output().Stage().Program(); + auto input = program.Body(); - if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input)) { + if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input, ctx)) { stageInput = Build<TKqpCnMapShard>(ctx, node.Pos()) .Output() .Stage(dqUnion.Output().Stage()) @@ -171,7 +319,7 @@ bool BuildDeleteRowsEffect(const TKqlDeleteRows& node, TExprContext& ctx, const auto dqUnion = node.Input().Cast<TDqCnUnionAll>(); auto input = dqUnion.Output().Stage().Program().Body(); - if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input)) { + if (InplaceUpdateEnabled(*kqpCtx.Config) && IsMapWrite(table, input, ctx)) { stageInput = Build<TKqpCnMapShard>(ctx, node.Pos()) .Output() .Stage(dqUnion.Output().Stage()) diff --git a/ydb/core/kqp/ut/CMakeLists.txt b/ydb/core/kqp/ut/CMakeLists.txt index 7de5aeda44e..42606bd4403 100644 --- a/ydb/core/kqp/ut/CMakeLists.txt +++ b/ydb/core/kqp/ut/CMakeLists.txt @@ -45,6 +45,7 @@ target_sources(ydb-core-kqp-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_locks_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_merge_connection_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_mvcc_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_effects_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_flowcontrol_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/kqp_newengine_ut.cpp diff --git a/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp b/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp new file mode 100644 index 00000000000..79a8839c596 --- /dev/null +++ b/ydb/core/kqp/ut/kqp_newengine_inplace_update_ut.cpp @@ -0,0 +1,421 @@ +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> + +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + +namespace NKikimr { +namespace NKqp { + +using namespace NYdb; +using namespace NYdb::NTable; + +Y_UNIT_TEST_SUITE(KqpNewEngineInplaceUpdate) { + +void PrepareTable(TSession& session) { + auto ret = session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/InplaceUpdate` ( + Key Uint64, + ValueStr String, + ValueInt Uint64, + ValueDbl Double, + PRIMARY KEY(Key) + ) WITH ( + PARTITION_AT_KEYS = (10) + ) + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(ret.GetStatus(), EStatus::SUCCESS, ret.GetIssues().ToString()); + + auto result = session.ExecuteDataQuery(R"( + UPSERT INTO `/Root/InplaceUpdate` (Key, ValueStr, ValueInt, ValueDbl) VALUES + (1, "One", 100, 101.0), + (20, "Two", 200, 202.0) + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); +} + +void Test(bool enableInplaceUpdate, const TString& query, TParams&& params, const TString& expectedResult, + std::function<void(const Ydb::TableStats::QueryStats&)>&& check) +{ + TKikimrRunner kikimr; + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + PrepareTable(session); + + TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + auto q = TStringBuilder() + << R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = 'true'; + PRAGMA kikimr.OptEnableInplaceUpdate = ')" << (enableInplaceUpdate ? "true" : "false") << "';" << Endl + << query; + + auto result = session.ExecuteDataQuery(q, TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + check(NYdb::TProtoAccessor::GetProto(*result.GetStats())); + + result = session.ExecuteDataQuery(R"( + PRAGMA kikimr.UseNewEngine = 'true'; + SELECT Key, ValueStr, ValueInt, ValueDbl FROM `/Root/InplaceUpdate` ORDER BY Key; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + CompareYson(expectedResult, FormatResultSetYson(result.GetResultSet(0))); +} + +#define ASSERT_LITERAL_PHASE(stats, phaseNo) \ + UNIT_ASSERT_C(stats.query_phases(phaseNo).table_access().empty(), stats.DebugString()); + +#define ASSERT_PHASE(stats, phaseNo, table, readsCnt, updatesCnt) \ + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access().size(), 1, stats.DebugString()); \ + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).name(), table, stats.DebugString()); \ + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).reads().rows(), readsCnt, stats.DebugString()); \ + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).updates().rows(), updatesCnt, stats.DebugString()); \ + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases(phaseNo).table_access(0).partitions_count(), std::max(readsCnt, updatesCnt), stats.DebugString()); + + +Y_UNIT_TEST_TWIN(SingleRowSimple, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint64; + DECLARE $value AS String; + + UPDATE `/Root/InplaceUpdate` SET + ValueStr = $value + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint64(1).Build() + .AddParam("$value").String("updated").Build() + .Build(), + R"([ + [[1u];["updated"];[100u];[101.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + if constexpr (EnableInplaceUpdate) { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1); + } + }); +} + +Y_UNIT_TEST_TWIN(SingleRowStr, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint64; + DECLARE $value AS String; + + UPDATE `/Root/InplaceUpdate` SET + ValueStr = Substring(ValueStr || $value, 1) + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint64(1).Build() + .AddParam("$value").String("updated").Build() + .Build(), + R"([ + [[1u];["neupdated"];[100u];[101.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + if constexpr (EnableInplaceUpdate) { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1); + } + }); +} + +Y_UNIT_TEST_TWIN(SingleRowArithm, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint64; + DECLARE $x AS Uint64; + DECLARE $y AS Double; + + UPDATE `/Root/InplaceUpdate` SET + ValueInt = (ValueInt + $x) * ($x + 1ul), + ValueDbl = (ValueDbl - $y) / ($y + 1.) + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint64(1).Build() + .AddParam("$x").Uint64(10).Build() + .AddParam("$y").Double(5.0).Build() + .Build(), + R"([ + [[1u];["One"];[1210u];[16.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + if constexpr (EnableInplaceUpdate) { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1); + } + }); +} + +Y_UNIT_TEST_TWIN(SingleRowIf, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint64; + + $trim = ($v, $min, $max) -> { + RETURN CASE + WHEN $v > $max THEN $max + WHEN $v < $min THEN $min + ELSE $v + END; + }; + + UPDATE `/Root/InplaceUpdate` SET + ValueInt = $trim(ValueInt, 0ul, 10ul) + 1ul, + ValueDbl = $trim(ValueDbl, 0., 10.) / 10.0 + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint64(1).Build() + .Build(), + R"([ + [[1u];["One"];[11u];[1.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + if constexpr (EnableInplaceUpdate) { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 1, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 1); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1); + } + }); +} + +// allow multiple keys in KqpLookupTable to enable this test +Y_UNIT_TEST_TWIN(Negative_SingleRowWithKeyCast, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint32; -- not Uint64 + DECLARE $value AS String; + + UPDATE `/Root/InplaceUpdate` SET + ValueStr = $value + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint32(1).Build() + .AddParam("$value").String("updated").Build() + .Build(), + R"([ + [[1u];["updated"];[100u];[101.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + // if constexpr (EnableInplaceUpdate) { + // UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + // ASSERT_LITERAL_PHASE(stats, 0); + // ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 1, 1); + // } else { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 3, stats.DebugString()); + ASSERT_LITERAL_PHASE(stats, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 2, "/Root/InplaceUpdate", 0, 1); + // } + }); +} + +Y_UNIT_TEST_TWIN(Negative_SingleRowWithValueCast, EnableInplaceUpdate) { +/* + ( + (declare $key (DataType 'Uint64)) + (declare $value (DataType 'Int32)) + (let $1 (KqpTable '"/Root/InplaceUpdate" '"72057594046644480:11" '"" '1)) + (let $2 (DataType 'Uint64)) + (let $3 (KqpLookupTable $1 (Iterator (AsList (AsStruct '('"Key" $key)))) '('"Key"))) + (return (FlatMap $3 (lambda '($4) (Just (AsStruct '('"Key" (Member $4 '"Key")) '('"ValueInt" (Just (Convert $value $2)))))))) + ) + + `Convert` is not safe callable, so there is no InplaceUpdate optimization here +*/ + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint64; + DECLARE $value AS Int32; -- not Uint64 + + UPDATE `/Root/InplaceUpdate` SET + ValueInt = $value + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint64(1).Build() + .AddParam("$value").Int32(1).Build() + .Build(), + R"([ + [[1u];["One"];[1u];[101.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1); + }); +} + +Y_UNIT_TEST_TWIN(Negative_SingleRowListFromRange, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key AS Uint64; + + $foo = ($x) -> { + $list = ListFromRange(1, 10); + RETURN $x || ListConcat(Cast($list as List<String>), ".."); + }; + + UPDATE `/Root/InplaceUpdate` SET + ValueStr = $foo(ValueStr) + WHERE Key = $key + )", + TParamsBuilder() + .AddParam("$key").Uint64(1).Build() + .Build(), + R"([ + [[1u];["One1..2..3..4..5..6..7..8..9"];[100u];[101.]]; + [[20u];["Two"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 2, stats.DebugString()); + ASSERT_PHASE(stats, 0, "/Root/InplaceUpdate", 1, 0); + ASSERT_PHASE(stats, 1, "/Root/InplaceUpdate", 0, 1); + }); +} + +// allow multiple keys in KqpLookupTable to enable this test +Y_UNIT_TEST_TWIN(Negative_BatchUpdate, EnableInplaceUpdate) { + Test( + EnableInplaceUpdate, + R"( DECLARE $key1 AS Uint64; + DECLARE $value1 AS String; + DECLARE $key2 AS Uint64; + DECLARE $value2 AS String; + + $foo = ($x, $k1, $v1, $v2) -> { + RETURN CASE $x + WHEN $k1 THEN $v1 + ELSE $v2 + END; + }; + + UPDATE `/Root/InplaceUpdate` SET + ValueStr = $foo(Key, $key1, $value1, $value2) + WHERE Key IN [$key1, $key2] + )", + TParamsBuilder() + .AddParam("$key1").Uint64(1).Build() + .AddParam("$value1").String("updated-1").Build() + .AddParam("$key2").Uint64(20).Build() + .AddParam("$value2").String("updated-2").Build() + .Build(), + R"([ + [[1u];["updated-1"];[100u];[101.]]; + [[20u];["updated-2"];[200u];[202.]] + ])", + [](const Ydb::TableStats::QueryStats& stats) { + // if constexpr (EnableInplaceUpdate) { + // UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 3, stats.DebugString()); + // ASSERT_LITERAL_PHASE(stats, 0); + // ASSERT_LITERAL_PHASE(stats, 1); + // ASSERT_PHASE(stats, 2, "/Root/InplaceUpdate", 2, 2); + // } else { + UNIT_ASSERT_VALUES_EQUAL_C(stats.query_phases().size(), 4, stats.DebugString()); + ASSERT_LITERAL_PHASE(stats, 0); + ASSERT_LITERAL_PHASE(stats, 1); + ASSERT_PHASE(stats, 2, "/Root/InplaceUpdate", 2, 0); + ASSERT_PHASE(stats, 3, "/Root/InplaceUpdate", 0, 2); + // } + }); +} + +Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) { + auto keysLimitSetting = NKikimrKqp::TKqpSetting(); + keysLimitSetting.SetName("_CommitPerShardKeysSizeLimitBytes"); + keysLimitSetting.SetValue("100"); + + TKikimrRunner kikimr({keysLimitSetting}); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + UNIT_ASSERT(session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/Temp` ( + Key Uint32, + Value1 String, + Value2 String, + PRIMARY KEY (Key) + ); + )").GetValueSync().IsSuccess()); + + auto result = session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/Temp` (Key, Value1) VALUES + (1u, "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"), + (3u, "123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto query = Sprintf(R"( + PRAGMA kikimr.UseNewEngine = 'true'; + PRAGMA kikimr.OptEnableInplaceUpdate = '%s'; + + DECLARE $Key AS Uint32; + + UPDATE `/Root/Temp` SET + Value2 = Value1 + WHERE Key = $Key + )", EnableInplaceUpdate ? "true" : "false"); + + auto params = db.GetParamsBuilder() + .AddParam("$Key").Uint32(1).Build() + .Build(); + + result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), params).ExtractValueSync(); + + if constexpr (EnableInplaceUpdate) { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_REQUEST); + UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR, [](const NYql::TIssue& issue) { + return issue.Message.Contains("READ_SIZE_EXECEEDED"); + })); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + result = session.ExecuteDataQuery(R"( + SELECT Value2 FROM `/Root/Temp` ORDER BY Value2; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + if constexpr (EnableInplaceUpdate) { + CompareYson(R"([[#];[#]])", FormatResultSetYson(result.GetResultSet(0))); + } else { + CompareYson(R"([ + [#];[["123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } +} + +} // suite + +} // namespace NKqp +} // namespace NKikimr
\ No newline at end of file diff --git a/ydb/core/kqp/ut/kqp_newengine_ut.cpp b/ydb/core/kqp/ut/kqp_newengine_ut.cpp index 4e99bea83d2..1043baef5e8 100644 --- a/ydb/core/kqp/ut/kqp_newengine_ut.cpp +++ b/ydb/core/kqp/ut/kqp_newengine_ut.cpp @@ -1592,49 +1592,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { ])", FormatResultSetYson(result.GetResultSet(0))); } - Y_UNIT_TEST(InplaceUpdate) { - TKikimrRunner kikimr; - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = session.ExecuteDataQuery(R"( - PRAGMA kikimr.UseNewEngine = "true"; - PRAGMA kikimr.OptEnableInplaceUpdate = "true"; - - UPDATE [/Root/TwoShard] - SET Value1 = "Updated" - WHERE Value2 = 1; - )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); - - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/TwoShard"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).updates().rows(), 2); - UNIT_ASSERT(stats.query_phases(0).table_access(0).updates().bytes() > 0); - UNIT_ASSERT(stats.query_phases(0).duration_us() > 0); - - result = session.ExecuteDataQuery(R"( - PRAGMA kikimr.UseNewEngine = "true"; - SELECT * FROM [/Root/TwoShard] ORDER BY Key; - )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - - CompareYson(R"([ - [[1u];["One"];[-1]]; - [[2u];["Two"];[0]]; - [[3u];["Updated"];[1]]; - [[4000000001u];["BigOne"];[-1]]; - [[4000000002u];["BigTwo"];[0]]; - [[4000000003u];["Updated"];[1]] - ])", FormatResultSetYson(result.GetResultSet(0))); - } - Y_UNIT_TEST(Delete) { TKikimrRunner kikimr; auto db = kikimr.GetTableClient(); diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make index 609ec3237f8..5aa9abf952a 100644 --- a/ydb/core/kqp/ut/ya.make +++ b/ydb/core/kqp/ut/ya.make @@ -33,6 +33,7 @@ SRCS( kqp_locks_ut.cpp kqp_merge_connection_ut.cpp kqp_mvcc_ut.cpp + kqp_newengine_inplace_update_ut.cpp kqp_newengine_effects_ut.cpp kqp_newengine_flowcontrol_ut.cpp kqp_newengine_ut.cpp |