diff options
| author | ulya-sidorina <[email protected]> | 2022-12-08 12:08:36 +0300 |
|---|---|---|
| committer | ulya-sidorina <[email protected]> | 2022-12-08 12:08:36 +0300 |
| commit | a871e0d3cbc0abef908f17920a4cf6f2b32bbe8e (patch) | |
| tree | 402b9f38a2cf1d888674de43bd1b36a0d0cdbb35 | |
| parent | bddb2f7be1188971cade9973d4c79f1ba9cee946 (diff) | |
support multiple conflicting immediate effects in kqp
fix(kqp): support multiple conflicting effects in kqp
| -rw-r--r-- | ydb/core/kqp/opt/kqp_opt_build_txs.cpp | 32 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_opt_build.cpp | 26 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.cpp | 15 | ||||
| -rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 26 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/kqp_immediate_effects.cpp | 277 | ||||
| -rw-r--r-- | ydb/tests/functional/api/test_insert.py | 10 |
6 files changed, 248 insertions, 138 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 04cf2889bd5..922d2ca1de4 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -466,21 +466,23 @@ public: } TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final { + outputExpr = inputExpr; + if (TKqpPhysicalQuery::Match(inputExpr.Get())) { - outputExpr = inputExpr; return TStatus::Ok; } YQL_CLOG(DEBUG, ProviderKqp) << ">>> TKqpBuildTxsTransformer: " << KqpExprToPrettyString(*inputExpr, ctx); TKqlQuery query(inputExpr); + TVector<TExprBase> queryResults; + for (; CurrentQueryBlockId < query.Blocks().Size(); ++CurrentQueryBlockId) { + const auto& block = query.Blocks().Item(CurrentQueryBlockId); - if (auto status = TryBuildPrecomputeTx(query, outputExpr, ctx)) { - return *status; - } + if (auto status = TryBuildPrecomputeTx(block, outputExpr, outputExpr, ctx)) { + return *status; + } - TVector<TExprBase> queryResults; - for (const auto& block : query.Blocks()) { if (!block.Results().Empty()) { auto tx = BuildTx(block.Results().Ptr(), ctx, false); if (!tx) { @@ -585,7 +587,7 @@ private: return true; } - std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputeDependencies(const TKqlQuery& query) { + std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputeDependencies(const TKqlQueryBlock& queryBlock) { TNodeOnNodeOwnedMap precomputes; TNodeOnNodeOwnedMap dependencies; @@ -632,14 +634,13 @@ private: return true; }; - VisitExpr(query.Ptr(), filter, gather); + VisitExpr(queryBlock.Ptr(), filter, gather); return std::make_pair(std::move(precomputes), std::move(dependencies)); } - TMaybe<TStatus> TryBuildPrecomputeTx(const TKqlQuery& query, TExprNode::TPtr& output, TExprContext& ctx) { - auto [precomputeStagesMap, dependantStagesMap] = GatherPrecomputeDependencies(query); - + TMaybe<TStatus> TryBuildPrecomputeTx(const TKqlQueryBlock& queryBlock, TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { + auto [precomputeStagesMap, dependantStagesMap] = GatherPrecomputeDependencies(queryBlock); if (precomputeStagesMap.empty()) { return {}; } @@ -661,8 +662,8 @@ private: } if (phaseStagesMap.empty()) { - output = query.Ptr(); - ctx.AddError(TIssue(ctx.GetPosition(query.Pos()), "Phase stages is empty")); + output = input; + ctx.AddError(TIssue(ctx.GetPosition(queryBlock.Pos()), "Phase stages is empty")); return TStatus::Error; } @@ -695,7 +696,7 @@ private: } Y_VERIFY_DEBUG(phaseResults.size() == computedInputs.size()); - auto phaseResultsNode = Build<TKqlQueryResultList>(ctx, query.Pos()) + auto phaseResultsNode = Build<TKqlQueryResultList>(ctx, queryBlock.Pos()) .Add(phaseResults) .Done(); @@ -720,7 +721,7 @@ private: replaceMap.emplace(input.Raw(), newInput.Ptr()); } - output = ctx.ReplaceNodes(query.Ptr(), replaceMap); + output = ctx.ReplaceNodes(std::move(input), replaceMap); return TStatus(TStatus::Repeat, true); } @@ -753,6 +754,7 @@ private: TAutoPtr<TKqpBuildTxTransformer> BuildTxTransformer; TAutoPtr<IGraphTransformer> DataTxTransformer; TAutoPtr<IGraphTransformer> ScanTxTransformer; + ui32 CurrentQueryBlockId = 0; }; } // namespace diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index b312ff1eb7b..177a7c88c27 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -67,7 +67,7 @@ struct TKiExploreTxResults { struct TKiQueryBlock { TVector<TExprBase> Results; TVector<TExprBase> Effects; - THashSet<std::string_view> TablesWithEffects; + THashMap<std::string_view, TYdbOperations> TableOperations; bool HasUncommittedChangesRead = false; }; @@ -96,14 +96,19 @@ struct TKiExploreTxResults { } } - void AddEffect(const TExprBase& effect, std::string_view table) { - if (QueryBlocks.empty()) { + void AddEffect(const TExprBase& effect, TYdbOperation op, std::string_view table) { + auto readOp = op & KikimrReadOps(); + auto uncommittedChangesRead = HasModifyOps(table) && readOp; + + if (QueryBlocks.empty() || uncommittedChangesRead) { AddQueryBlock(); } auto& curBlock = QueryBlocks.back(); curBlock.Effects.push_back(effect); - curBlock.TablesWithEffects.insert(table); + curBlock.HasUncommittedChangesRead = uncommittedChangesRead; + auto& currentOps = curBlock.TableOperations[table]; + currentOps |= op; } void AddResult(const TExprBase& result) { @@ -115,13 +120,14 @@ struct TKiExploreTxResults { curBlock.Results.push_back(result); } - bool HasEffects(std::string_view table) { + bool HasModifyOps(std::string_view table) { if (QueryBlocks.empty()) { return false; } auto& curBlock = QueryBlocks.back(); - return curBlock.TablesWithEffects.contains(table); + auto currentOps = curBlock.TableOperations[table]; + return currentOps & KikimrModifyOps(); } void AddQueryBlock() { @@ -177,7 +183,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx)); - if (txRes.HasEffects(table)) { + if (txRes.HasModifyOps(table)) { txRes.AddQueryBlock(); txRes.SetBlockHasUncommittedChangesRead(); } @@ -195,7 +201,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto result = ExploreTx(write.World(), ctx, dataSink, txRes); auto tableOp = GetTableOp(write); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx)); - txRes.AddEffect(node, table); + txRes.AddEffect(node, tableOp, table); return result; } @@ -209,7 +215,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T txRes.Ops.insert(node.Raw()); auto result = ExploreTx(update.World(), ctx, dataSink, txRes); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Update, update.Pos(), ctx)); - txRes.AddEffect(node, table); + txRes.AddEffect(node, TYdbOperation::Update, table); return result; } @@ -223,7 +229,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T txRes.Ops.insert(node.Raw()); auto result = ExploreTx(del.World(), ctx, dataSink, txRes); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Delete, del.Pos(), ctx)); - txRes.AddEffect(node, table); + txRes.AddEffect(node, TYdbOperation::Delete, table); return result; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 40b3b27b9cd..15f23c6f916 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -28,7 +28,6 @@ struct TKikimrData { TYdbOperations DataOps; TYdbOperations ModifyOps; TYdbOperations ReadOps; - TYdbOperations RequireUnmodifiedOps; TMap<TString, NKikimr::NUdf::EDataSlot> SystemColumns; @@ -78,7 +77,10 @@ struct TKikimrData { ReadOps = TYdbOperation::Select | TYdbOperation::Update | - TYdbOperation::Delete; + TYdbOperation::Delete | + TYdbOperation::InsertRevert | + TYdbOperation::InsertAbort | + TYdbOperation::UpdateOn; // TODO: KIKIMR-3206 DataOps = ModifyOps | ReadOps; @@ -93,11 +95,6 @@ struct TKikimrData { TYdbOperation::AlterGroup | TYdbOperation::DropGroup; - RequireUnmodifiedOps = - TYdbOperation::InsertRevert | - TYdbOperation::InsertAbort | - TYdbOperation::UpdateOn; // TODO: KIKIMR-3206 - SystemColumns = { {"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64} }; @@ -534,10 +531,6 @@ const TYdbOperations& KikimrReadOps() { return Singleton<TKikimrData>()->ReadOps; } -const TYdbOperations& KikimrRequireUnmodifiedOps() { - return Singleton<TKikimrData>()->RequireUnmodifiedOps; -} - const TMap<TString, NKikimr::NUdf::EDataSlot>& KikimrSystemColumns() { return Singleton<TKikimrData>()->SystemColumns; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 7c0af677509..492322f2908 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -267,7 +267,6 @@ const TYdbOperations& KikimrSchemeOps(); const TYdbOperations& KikimrDataOps(); const TYdbOperations& KikimrModifyOps(); const TYdbOperations& KikimrReadOps(); -const TYdbOperations& KikimrRequireUnmodifiedOps(); bool AddDmlIssue(const TIssue& issue, TExprContext& ctx); @@ -393,13 +392,6 @@ public: auto& currentOps = TableOperations[table]; bool currentModify = currentOps & KikimrModifyOps(); if (currentModify && !enableImmediateEffects) { - if (KikimrRequireUnmodifiedOps() & newOp) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' can't be performed on previously modified table: " << table; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - if (KikimrReadOps() & newOp) { TString message = TStringBuilder() << "Data modifications previously made to table '" << table << "' in current transaction won't be seen by operation: '" << newOp << "'"; @@ -415,24 +407,6 @@ public: } } - if ((KikimrRequireUnmodifiedOps() & newOp) && isolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { - TString message = TStringBuilder() - << "Operation '" << newOp << "' is only supported with SERIALIZABLE isolation level"; - ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); - return false; - } - - // TODO: KIKIMR-3206 - bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn); - bool newUpdate = newOp == TYdbOperation::Update; - if (currentDelete && newUpdate && !enableImmediateEffects) { - TString message = TStringBuilder() << "Operation '" << newOp - << "' may lead to unexpected results when applied to table with deleted rows: " << table; - if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), ctx)) { - return false; - } - } - currentOps |= newOp; } diff --git a/ydb/core/kqp/ut/kqp_immediate_effects.cpp b/ydb/core/kqp/ut/kqp_immediate_effects.cpp index 1090811e677..5f39fefbe3e 100644 --- a/ydb/core/kqp/ut/kqp_immediate_effects.cpp +++ b/ydb/core/kqp/ut/kqp_immediate_effects.cpp @@ -11,7 +11,7 @@ namespace { AssertSuccessResult(session.ExecuteSchemeQuery(R"( --!syntax_v1 - CREATE TABLE `/Root/TestImmediateEffects` ( + CREATE TABLE TestImmediateEffects ( Key Uint64, Value String, PRIMARY KEY (Key) @@ -21,7 +21,7 @@ namespace { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + INSERT INTO TestImmediateEffects (Key, Value) VALUES (1u, "One"), (2u, "Two"); )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); @@ -45,12 +45,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"), (4u, "Four"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -69,11 +69,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (5u, "Five"); - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (6u, "Six"); - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (7u, "Seven"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (5u, "Five"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (6u, "Six"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (7u, "Seven"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -103,12 +103,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "SomeValue1"), (3u, "SomeValue2"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -138,9 +138,9 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue1"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue1"); + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -157,11 +157,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue11"); - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue2"); - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue22"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue11"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue2"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue22"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -186,12 +186,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"), (4u, "Four"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -210,11 +210,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (5u, "Five"); - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (6u, "Six"); - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (7u, "Seven"); + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (5u, "Five"); + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (6u, "Six"); + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (7u, "Seven"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -244,12 +244,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (3u, "SomeValue1"), (3u, "SomeValue2"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -279,9 +279,9 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue1"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue1"); + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -298,11 +298,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue11"); - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue2"); - REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue22"); + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue11"); + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue2"); + REPLACE INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue22"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -327,12 +327,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"), (4u, "Four"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -351,11 +351,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (5u, "Five"); - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (6u, "Six"); - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (7u, "Seven"); + INSERT INTO TestImmediateEffects (Key, Value) VALUES (5u, "Five"); + INSERT INTO TestImmediateEffects (Key, Value) VALUES (6u, "Six"); + INSERT INTO TestImmediateEffects (Key, Value) VALUES (7u, "Seven"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -385,12 +385,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"), (3u, "SomeValue"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, [](const NYql::TIssue& issue) { @@ -414,11 +414,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + INSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, [](const NYql::TIssue& issue) { @@ -442,12 +442,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - UPDATE `/Root/TestImmediateEffects` ON (Key, Value) VALUES + SELECT * FROM TestImmediateEffects; + UPDATE TestImmediateEffects ON (Key, Value) VALUES (1u, "Updated1"), (2u, "Updated2"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -464,14 +464,14 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - UPDATE `/Root/TestImmediateEffects` ON (Key, Value) VALUES + UPDATE TestImmediateEffects ON (Key, Value) VALUES (1u, "Updated3"), (2u, "Updated4"); - UPDATE `/Root/TestImmediateEffects` ON (Key, Value) VALUES + UPDATE TestImmediateEffects ON (Key, Value) VALUES (1u, "Updated5"); - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -496,10 +496,10 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - SELECT * FROM `/Root/TestImmediateEffects`; - DELETE FROM `/Root/TestImmediateEffects` WHERE Key = 2; + SELECT * FROM TestImmediateEffects; + DELETE FROM TestImmediateEffects WHERE Key = 2; - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -511,26 +511,18 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { ])", FormatResultSetYson(result.GetResultSet(1))); } - { + { // multiple effects auto result = session.ExecuteDataQuery(R"( --!syntax_v1 - UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"), (4u, "Four"); - )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - - { // multiple effects - auto result = session.ExecuteDataQuery(R"( - --!syntax_v1 + DELETE FROM TestImmediateEffects WHERE Key > 3; + DELETE FROM TestImmediateEffects WHERE Key < 3; - DELETE FROM `/Root/TestImmediateEffects` WHERE Key > 3; - DELETE FROM `/Root/TestImmediateEffects` WHERE Key < 3; - - SELECT * FROM `/Root/TestImmediateEffects`; + SELECT * FROM TestImmediateEffects; )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); CompareYson(R"([ @@ -538,6 +530,149 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { ])", FormatResultSetYson(result.GetResultSet(0))); } } + + Y_UNIT_TEST(UpdateAfterUpsert) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateTestTable(session); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four"); + + UPDATE TestImmediateEffects SET Value = "Updated2" WHERE Key = 2; + UPDATE TestImmediateEffects SET Value = "Updated3" WHERE Key = 3; + + SELECT * FROM TestImmediateEffects; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Updated2"]]; + [[3u];["Updated3"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(DeleteAfterUpsert) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateTestTable(session); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four"); + + DELETE FROM TestImmediateEffects WHERE Key = 2; + DELETE FROM TestImmediateEffects WHERE Key = 3; + + SELECT * FROM TestImmediateEffects; + + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "Value2"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Value3"); + + SELECT * FROM TestImmediateEffects; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Value2"]]; + [[3u];["Value3"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(1))); + } + + Y_UNIT_TEST(UpdateAfterInsert) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateTestTable(session); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"); + INSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four"); + + UPDATE TestImmediateEffects SET Value = "Updated2" WHERE Key = 2; + UPDATE TestImmediateEffects SET Value = "Updated3" WHERE Key = 3; + + SELECT * FROM TestImmediateEffects; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Updated2"]]; + [[3u];["Updated3"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(DeleteAfterInsert) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateTestTable(session); + + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"); + INSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four"); + + DELETE FROM TestImmediateEffects WHERE Key = 2; + DELETE FROM TestImmediateEffects WHERE Key = 3; + + SELECT * FROM TestImmediateEffects; + + INSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "Two"); + INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"); + + SELECT * FROM TestImmediateEffects; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["Three"]]; + [[4u];["Four"]] + ])", FormatResultSetYson(result.GetResultSet(1))); + } } } // namespace NKqp diff --git a/ydb/tests/functional/api/test_insert.py b/ydb/tests/functional/api/test_insert.py index 0b743b80f7a..bc223cd7798 100644 --- a/ydb/tests/functional/api/test_insert.py +++ b/ydb/tests/functional/api/test_insert.py @@ -45,7 +45,7 @@ class TestInsertOperations(object): callee, raises( ydb.GenericError, - "Operation .*InsertAbort.* can.*t be performed on previously modified table", + "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name, ) ) @@ -77,7 +77,7 @@ class TestInsertOperations(object): callee, raises( ydb.GenericError, - "Operation .*InsertAbort.* can.*t be performed on previously modified table", + "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name, ) ) @@ -111,7 +111,7 @@ class TestInsertOperations(object): callee, raises( ydb.GenericError, - "Operation .*InsertAbort.* can.*t be performed on previously modified table", + "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name, ) ) @@ -141,7 +141,7 @@ class TestInsertOperations(object): callee, raises( ydb.GenericError, - "Operation .*InsertAbort.* can.*t be performed on previously modified table", + "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name, ) ) @@ -202,7 +202,7 @@ class TestInsertOperations(object): callee, raises( ydb.GenericError, - "Operation .*InsertRevert.* can.*t be performed on previously modified table", + "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name, ) ) |
