summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <[email protected]>2022-12-08 12:08:36 +0300
committerulya-sidorina <[email protected]>2022-12-08 12:08:36 +0300
commita871e0d3cbc0abef908f17920a4cf6f2b32bbe8e (patch)
tree402b9f38a2cf1d888674de43bd1b36a0d0cdbb35
parentbddb2f7be1188971cade9973d4c79f1ba9cee946 (diff)
support multiple conflicting immediate effects in kqp
fix(kqp): support multiple conflicting effects in kqp
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp32
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp26
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.cpp15
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h26
-rw-r--r--ydb/core/kqp/ut/kqp_immediate_effects.cpp277
-rw-r--r--ydb/tests/functional/api/test_insert.py10
6 files changed, 248 insertions, 138 deletions
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 04cf2889bd5..922d2ca1de4 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -466,21 +466,23 @@ public:
}
TStatus DoTransform(TExprNode::TPtr inputExpr, TExprNode::TPtr& outputExpr, TExprContext& ctx) final {
+ outputExpr = inputExpr;
+
if (TKqpPhysicalQuery::Match(inputExpr.Get())) {
- outputExpr = inputExpr;
return TStatus::Ok;
}
YQL_CLOG(DEBUG, ProviderKqp) << ">>> TKqpBuildTxsTransformer: " << KqpExprToPrettyString(*inputExpr, ctx);
TKqlQuery query(inputExpr);
+ TVector<TExprBase> queryResults;
+ for (; CurrentQueryBlockId < query.Blocks().Size(); ++CurrentQueryBlockId) {
+ const auto& block = query.Blocks().Item(CurrentQueryBlockId);
- if (auto status = TryBuildPrecomputeTx(query, outputExpr, ctx)) {
- return *status;
- }
+ if (auto status = TryBuildPrecomputeTx(block, outputExpr, outputExpr, ctx)) {
+ return *status;
+ }
- TVector<TExprBase> queryResults;
- for (const auto& block : query.Blocks()) {
if (!block.Results().Empty()) {
auto tx = BuildTx(block.Results().Ptr(), ctx, false);
if (!tx) {
@@ -585,7 +587,7 @@ private:
return true;
}
- std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputeDependencies(const TKqlQuery& query) {
+ std::pair<TNodeOnNodeOwnedMap, TNodeOnNodeOwnedMap> GatherPrecomputeDependencies(const TKqlQueryBlock& queryBlock) {
TNodeOnNodeOwnedMap precomputes;
TNodeOnNodeOwnedMap dependencies;
@@ -632,14 +634,13 @@ private:
return true;
};
- VisitExpr(query.Ptr(), filter, gather);
+ VisitExpr(queryBlock.Ptr(), filter, gather);
return std::make_pair(std::move(precomputes), std::move(dependencies));
}
- TMaybe<TStatus> TryBuildPrecomputeTx(const TKqlQuery& query, TExprNode::TPtr& output, TExprContext& ctx) {
- auto [precomputeStagesMap, dependantStagesMap] = GatherPrecomputeDependencies(query);
-
+ TMaybe<TStatus> TryBuildPrecomputeTx(const TKqlQueryBlock& queryBlock, TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
+ auto [precomputeStagesMap, dependantStagesMap] = GatherPrecomputeDependencies(queryBlock);
if (precomputeStagesMap.empty()) {
return {};
}
@@ -661,8 +662,8 @@ private:
}
if (phaseStagesMap.empty()) {
- output = query.Ptr();
- ctx.AddError(TIssue(ctx.GetPosition(query.Pos()), "Phase stages is empty"));
+ output = input;
+ ctx.AddError(TIssue(ctx.GetPosition(queryBlock.Pos()), "Phase stages is empty"));
return TStatus::Error;
}
@@ -695,7 +696,7 @@ private:
}
Y_VERIFY_DEBUG(phaseResults.size() == computedInputs.size());
- auto phaseResultsNode = Build<TKqlQueryResultList>(ctx, query.Pos())
+ auto phaseResultsNode = Build<TKqlQueryResultList>(ctx, queryBlock.Pos())
.Add(phaseResults)
.Done();
@@ -720,7 +721,7 @@ private:
replaceMap.emplace(input.Raw(), newInput.Ptr());
}
- output = ctx.ReplaceNodes(query.Ptr(), replaceMap);
+ output = ctx.ReplaceNodes(std::move(input), replaceMap);
return TStatus(TStatus::Repeat, true);
}
@@ -753,6 +754,7 @@ private:
TAutoPtr<TKqpBuildTxTransformer> BuildTxTransformer;
TAutoPtr<IGraphTransformer> DataTxTransformer;
TAutoPtr<IGraphTransformer> ScanTxTransformer;
+ ui32 CurrentQueryBlockId = 0;
};
} // namespace
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index b312ff1eb7b..177a7c88c27 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -67,7 +67,7 @@ struct TKiExploreTxResults {
struct TKiQueryBlock {
TVector<TExprBase> Results;
TVector<TExprBase> Effects;
- THashSet<std::string_view> TablesWithEffects;
+ THashMap<std::string_view, TYdbOperations> TableOperations;
bool HasUncommittedChangesRead = false;
};
@@ -96,14 +96,19 @@ struct TKiExploreTxResults {
}
}
- void AddEffect(const TExprBase& effect, std::string_view table) {
- if (QueryBlocks.empty()) {
+ void AddEffect(const TExprBase& effect, TYdbOperation op, std::string_view table) {
+ auto readOp = op & KikimrReadOps();
+ auto uncommittedChangesRead = HasModifyOps(table) && readOp;
+
+ if (QueryBlocks.empty() || uncommittedChangesRead) {
AddQueryBlock();
}
auto& curBlock = QueryBlocks.back();
curBlock.Effects.push_back(effect);
- curBlock.TablesWithEffects.insert(table);
+ curBlock.HasUncommittedChangesRead = uncommittedChangesRead;
+ auto& currentOps = curBlock.TableOperations[table];
+ currentOps |= op;
}
void AddResult(const TExprBase& result) {
@@ -115,13 +120,14 @@ struct TKiExploreTxResults {
curBlock.Results.push_back(result);
}
- bool HasEffects(std::string_view table) {
+ bool HasModifyOps(std::string_view table) {
if (QueryBlocks.empty()) {
return false;
}
auto& curBlock = QueryBlocks.back();
- return curBlock.TablesWithEffects.contains(table);
+ auto currentOps = curBlock.TableOperations[table];
+ return currentOps & KikimrModifyOps();
}
void AddQueryBlock() {
@@ -177,7 +183,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx));
- if (txRes.HasEffects(table)) {
+ if (txRes.HasModifyOps(table)) {
txRes.AddQueryBlock();
txRes.SetBlockHasUncommittedChangesRead();
}
@@ -195,7 +201,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto result = ExploreTx(write.World(), ctx, dataSink, txRes);
auto tableOp = GetTableOp(write);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx));
- txRes.AddEffect(node, table);
+ txRes.AddEffect(node, tableOp, table);
return result;
}
@@ -209,7 +215,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(update.World(), ctx, dataSink, txRes);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Update, update.Pos(), ctx));
- txRes.AddEffect(node, table);
+ txRes.AddEffect(node, TYdbOperation::Update, table);
return result;
}
@@ -223,7 +229,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
txRes.Ops.insert(node.Raw());
auto result = ExploreTx(del.World(), ctx, dataSink, txRes);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Delete, del.Pos(), ctx));
- txRes.AddEffect(node, table);
+ txRes.AddEffect(node, TYdbOperation::Delete, table);
return result;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
index 40b3b27b9cd..15f23c6f916 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp
@@ -28,7 +28,6 @@ struct TKikimrData {
TYdbOperations DataOps;
TYdbOperations ModifyOps;
TYdbOperations ReadOps;
- TYdbOperations RequireUnmodifiedOps;
TMap<TString, NKikimr::NUdf::EDataSlot> SystemColumns;
@@ -78,7 +77,10 @@ struct TKikimrData {
ReadOps =
TYdbOperation::Select |
TYdbOperation::Update |
- TYdbOperation::Delete;
+ TYdbOperation::Delete |
+ TYdbOperation::InsertRevert |
+ TYdbOperation::InsertAbort |
+ TYdbOperation::UpdateOn; // TODO: KIKIMR-3206
DataOps = ModifyOps | ReadOps;
@@ -93,11 +95,6 @@ struct TKikimrData {
TYdbOperation::AlterGroup |
TYdbOperation::DropGroup;
- RequireUnmodifiedOps =
- TYdbOperation::InsertRevert |
- TYdbOperation::InsertAbort |
- TYdbOperation::UpdateOn; // TODO: KIKIMR-3206
-
SystemColumns = {
{"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64}
};
@@ -534,10 +531,6 @@ const TYdbOperations& KikimrReadOps() {
return Singleton<TKikimrData>()->ReadOps;
}
-const TYdbOperations& KikimrRequireUnmodifiedOps() {
- return Singleton<TKikimrData>()->RequireUnmodifiedOps;
-}
-
const TMap<TString, NKikimr::NUdf::EDataSlot>& KikimrSystemColumns() {
return Singleton<TKikimrData>()->SystemColumns;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 7c0af677509..492322f2908 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -267,7 +267,6 @@ const TYdbOperations& KikimrSchemeOps();
const TYdbOperations& KikimrDataOps();
const TYdbOperations& KikimrModifyOps();
const TYdbOperations& KikimrReadOps();
-const TYdbOperations& KikimrRequireUnmodifiedOps();
bool AddDmlIssue(const TIssue& issue, TExprContext& ctx);
@@ -393,13 +392,6 @@ public:
auto& currentOps = TableOperations[table];
bool currentModify = currentOps & KikimrModifyOps();
if (currentModify && !enableImmediateEffects) {
- if (KikimrRequireUnmodifiedOps() & newOp) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' can't be performed on previously modified table: " << table;
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
if (KikimrReadOps() & newOp) {
TString message = TStringBuilder() << "Data modifications previously made to table '" << table
<< "' in current transaction won't be seen by operation: '" << newOp << "'";
@@ -415,24 +407,6 @@ public:
}
}
- if ((KikimrRequireUnmodifiedOps() & newOp) && isolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
- TString message = TStringBuilder()
- << "Operation '" << newOp << "' is only supported with SERIALIZABLE isolation level";
- ctx.AddError(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message));
- return false;
- }
-
- // TODO: KIKIMR-3206
- bool currentDelete = currentOps & (TYdbOperation::Delete | TYdbOperation::DeleteOn);
- bool newUpdate = newOp == TYdbOperation::Update;
- if (currentDelete && newUpdate && !enableImmediateEffects) {
- TString message = TStringBuilder() << "Operation '" << newOp
- << "' may lead to unexpected results when applied to table with deleted rows: " << table;
- if (!AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_UPDATE_TABLE_WITH_DELETES, message), ctx)) {
- return false;
- }
- }
-
currentOps |= newOp;
}
diff --git a/ydb/core/kqp/ut/kqp_immediate_effects.cpp b/ydb/core/kqp/ut/kqp_immediate_effects.cpp
index 1090811e677..5f39fefbe3e 100644
--- a/ydb/core/kqp/ut/kqp_immediate_effects.cpp
+++ b/ydb/core/kqp/ut/kqp_immediate_effects.cpp
@@ -11,7 +11,7 @@ namespace {
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
- CREATE TABLE `/Root/TestImmediateEffects` (
+ CREATE TABLE TestImmediateEffects (
Key Uint64,
Value String,
PRIMARY KEY (Key)
@@ -21,7 +21,7 @@ namespace {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES
(1u, "One"),
(2u, "Two");
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
@@ -45,12 +45,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
(3u, "Three"),
(4u, "Four");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -69,11 +69,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (5u, "Five");
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (6u, "Six");
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (7u, "Seven");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (5u, "Five");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (6u, "Six");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (7u, "Seven");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -103,12 +103,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
(3u, "SomeValue1"),
(3u, "SomeValue2");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -138,9 +138,9 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue1");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue1");
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -157,11 +157,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue11");
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue2");
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue22");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue11");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue2");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue22");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -186,12 +186,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES
(3u, "Three"),
(4u, "Four");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -210,11 +210,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (5u, "Five");
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (6u, "Six");
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (7u, "Seven");
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (5u, "Five");
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (6u, "Six");
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (7u, "Seven");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -244,12 +244,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES
(3u, "SomeValue1"),
(3u, "SomeValue2");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -279,9 +279,9 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue1");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue1");
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -298,11 +298,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (1u, "SomeValue11");
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue2");
- REPLACE INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (2u, "SomeValue22");
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (1u, "SomeValue11");
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue2");
+ REPLACE INTO TestImmediateEffects (Key, Value) VALUES (2u, "SomeValue22");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -327,12 +327,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES
(3u, "Three"),
(4u, "Four");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -351,11 +351,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (5u, "Five");
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (6u, "Six");
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES (7u, "Seven");
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (5u, "Five");
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (6u, "Six");
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (7u, "Seven");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -385,12 +385,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES
(3u, "Three"),
(3u, "SomeValue");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, [](const NYql::TIssue& issue) {
@@ -414,11 +414,11 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- INSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES
(2u, "SomeValue");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
UNIT_ASSERT(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, [](const NYql::TIssue& issue) {
@@ -442,12 +442,12 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- UPDATE `/Root/TestImmediateEffects` ON (Key, Value) VALUES
+ SELECT * FROM TestImmediateEffects;
+ UPDATE TestImmediateEffects ON (Key, Value) VALUES
(1u, "Updated1"),
(2u, "Updated2");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -464,14 +464,14 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- UPDATE `/Root/TestImmediateEffects` ON (Key, Value) VALUES
+ UPDATE TestImmediateEffects ON (Key, Value) VALUES
(1u, "Updated3"),
(2u, "Updated4");
- UPDATE `/Root/TestImmediateEffects` ON (Key, Value) VALUES
+ UPDATE TestImmediateEffects ON (Key, Value) VALUES
(1u, "Updated5");
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -496,10 +496,10 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- SELECT * FROM `/Root/TestImmediateEffects`;
- DELETE FROM `/Root/TestImmediateEffects` WHERE Key = 2;
+ SELECT * FROM TestImmediateEffects;
+ DELETE FROM TestImmediateEffects WHERE Key = 2;
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -511,26 +511,18 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
])", FormatResultSetYson(result.GetResultSet(1)));
}
- {
+ { // multiple effects
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
- UPSERT INTO `/Root/TestImmediateEffects` (Key, Value) VALUES
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES
(3u, "Three"),
(4u, "Four");
- )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
- }
-
- { // multiple effects
- auto result = session.ExecuteDataQuery(R"(
- --!syntax_v1
+ DELETE FROM TestImmediateEffects WHERE Key > 3;
+ DELETE FROM TestImmediateEffects WHERE Key < 3;
- DELETE FROM `/Root/TestImmediateEffects` WHERE Key > 3;
- DELETE FROM `/Root/TestImmediateEffects` WHERE Key < 3;
-
- SELECT * FROM `/Root/TestImmediateEffects`;
+ SELECT * FROM TestImmediateEffects;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([
@@ -538,6 +530,149 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
])", FormatResultSetYson(result.GetResultSet(0)));
}
}
+
+ Y_UNIT_TEST(UpdateAfterUpsert) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateTestTable(session);
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four");
+
+ UPDATE TestImmediateEffects SET Value = "Updated2" WHERE Key = 2;
+ UPDATE TestImmediateEffects SET Value = "Updated3" WHERE Key = 3;
+
+ SELECT * FROM TestImmediateEffects;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Updated2"]];
+ [[3u];["Updated3"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(DeleteAfterUpsert) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateTestTable(session);
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four");
+
+ DELETE FROM TestImmediateEffects WHERE Key = 2;
+ DELETE FROM TestImmediateEffects WHERE Key = 3;
+
+ SELECT * FROM TestImmediateEffects;
+
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "Value2");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Value3");
+
+ SELECT * FROM TestImmediateEffects;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Value2"]];
+ [[3u];["Value3"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ Y_UNIT_TEST(UpdateAfterInsert) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateTestTable(session);
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three");
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four");
+
+ UPDATE TestImmediateEffects SET Value = "Updated2" WHERE Key = 2;
+ UPDATE TestImmediateEffects SET Value = "Updated3" WHERE Key = 3;
+
+ SELECT * FROM TestImmediateEffects;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Updated2"]];
+ [[3u];["Updated3"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(DeleteAfterInsert) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateTestTable(session);
+
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three");
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (4u, "Four");
+
+ DELETE FROM TestImmediateEffects WHERE Key = 2;
+ DELETE FROM TestImmediateEffects WHERE Key = 3;
+
+ SELECT * FROM TestImmediateEffects;
+
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (2u, "Two");
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three");
+
+ SELECT * FROM TestImmediateEffects;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["Three"]];
+ [[4u];["Four"]]
+ ])", FormatResultSetYson(result.GetResultSet(1)));
+ }
}
} // namespace NKqp
diff --git a/ydb/tests/functional/api/test_insert.py b/ydb/tests/functional/api/test_insert.py
index 0b743b80f7a..bc223cd7798 100644
--- a/ydb/tests/functional/api/test_insert.py
+++ b/ydb/tests/functional/api/test_insert.py
@@ -45,7 +45,7 @@ class TestInsertOperations(object):
callee,
raises(
ydb.GenericError,
- "Operation .*InsertAbort.* can.*t be performed on previously modified table",
+ "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name,
)
)
@@ -77,7 +77,7 @@ class TestInsertOperations(object):
callee,
raises(
ydb.GenericError,
- "Operation .*InsertAbort.* can.*t be performed on previously modified table",
+ "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name,
)
)
@@ -111,7 +111,7 @@ class TestInsertOperations(object):
callee,
raises(
ydb.GenericError,
- "Operation .*InsertAbort.* can.*t be performed on previously modified table",
+ "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name,
)
)
@@ -141,7 +141,7 @@ class TestInsertOperations(object):
callee,
raises(
ydb.GenericError,
- "Operation .*InsertAbort.* can.*t be performed on previously modified table",
+ "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name,
)
)
@@ -202,7 +202,7 @@ class TestInsertOperations(object):
callee,
raises(
ydb.GenericError,
- "Operation .*InsertRevert.* can.*t be performed on previously modified table",
+ "Data modifications previously made to table .*%s.* in current transaction won.*t be seen" % name,
)
)