diff options
author | ulya-sidorina <yulia@ydb.tech> | 2022-12-14 14:53:20 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2022-12-14 14:53:20 +0300 |
commit | 6aec14798ad91ed132f3da681c3d5b9c6fb2240d (patch) | |
tree | c5b223b8a792ec0e7938b7c676cd03acea5eec70 | |
parent | 7b2317ba8baa3414534e276c89e74f3033be9f69 (diff) | |
download | ydb-6aec14798ad91ed132f3da681c3d5b9c6fb2240d.tar.gz |
Support immediate effects for secondary indexes
feature(kqp): support immediate effects for secondary indexes
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_opt.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_opt_build.cpp | 230 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp | 209 |
4 files changed, 404 insertions, 39 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp index f38393e64a..ca3dfce32e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp @@ -75,7 +75,7 @@ TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr if (auto maybeDatasink = node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) { auto cluster = TString(maybeDatasink.Cast().Cluster()); - ret = KiBuildQuery(node, ctx); + ret = KiBuildQuery(node, ctx, sessionCtx->TablesPtr()); if (ret != inputNode) { return ret; diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 86d4aab743..983cebbb7c 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -63,11 +63,19 @@ ui64 GetResultRowsLimit(const TResWriteBase& resWrite) { return 0; } +enum class TPrimitiveYdbOperation : ui32 { + Read = 1 << 0, + Write = 1 << 1 +}; + +Y_DECLARE_FLAGS(TPrimitiveYdbOperations, TPrimitiveYdbOperation) +Y_DECLARE_OPERATORS_FOR_FLAGS(TPrimitiveYdbOperations) + struct TKiExploreTxResults { struct TKiQueryBlock { TVector<TExprBase> Results; TVector<TExprBase> Effects; - THashMap<TString, TYdbOperations> TableOperations; + THashMap<TString, TPrimitiveYdbOperations> TableOperations; bool HasUncommittedChangesRead = false; }; @@ -96,9 +104,114 @@ struct TKiExploreTxResults { } } - void AddEffect(const TExprBase& effect, TYdbOperation op, std::string_view table) { - auto readOp = op & KikimrReadOps(); - auto uncommittedChangesRead = HasModifyOps(table) && readOp; + void AddReadOpToQueryBlock(const TKikimrKey& key, const TCoAtomList& readColumns, TKikimrTableMetadataPtr tableMeta) { + YQL_ENSURE(tableMeta, "Empty table metadata"); + + bool uncommittedChangesRead = false; + if (key.GetView()) { + const auto& indexName = key.GetView().GetRef(); + const auto indexTablePath = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, indexName); + + auto indexIt = std::find_if(tableMeta->Indexes.begin(), tableMeta->Indexes.end(), [&indexName](const auto& index){ + return index.Name == indexName; + }); + YQL_ENSURE(indexIt != tableMeta->Indexes.end(), "Index not found"); + + THashSet<TString> indexColumns; + indexColumns.reserve(indexIt->KeyColumns.size() + indexIt->DataColumns.size()); + for (const auto& keyColumn : indexIt->KeyColumns) { + indexColumns.insert(keyColumn); + } + + for (const auto& column : indexIt->DataColumns) { + indexColumns.insert(column); + } + + bool needMainTableRead = false; + for (const auto& col : readColumns) { + if (!indexColumns.contains(col.StringValue())) { + needMainTableRead = true; + break; + } + } + + uncommittedChangesRead = HasWriteOps(indexTablePath) || (needMainTableRead && HasWriteOps(tableMeta->Name)); + } else { + uncommittedChangesRead = HasWriteOps(tableMeta->Name); + } + + if (uncommittedChangesRead) { + AddQueryBlock(); + SetBlockHasUncommittedChangesRead(); + } + } + + void AddWriteOpToQueryBlock(const TExprBase& effect, TKikimrTableMetadataPtr tableMeta, bool needMainTableRead) { + YQL_ENSURE(tableMeta, "Empty table metadata"); + + THashMap<TString, TPrimitiveYdbOperations> ops; + if (needMainTableRead) { + ops[tableMeta->Name] |= TPrimitiveYdbOperation::Read; + } + ops[tableMeta->Name] |= TPrimitiveYdbOperation::Write; + + for (const auto& index : tableMeta->Indexes) { + if (!index.ItUsedForWrite()) { + continue; + } + + const auto indexTable = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, index.Name); + + ops[tableMeta->Name] |= TPrimitiveYdbOperation::Read; + ops[indexTable] = TPrimitiveYdbOperation::Write; + } + + AddEffect(effect, ops); + } + + void AddUpdateOpToQueryBlock(const TExprBase& effect, TKikimrTableMetadataPtr tableMeta, + const THashSet<std::string_view>& updateColumns) { + YQL_ENSURE(tableMeta, "Empty table metadata"); + + THashMap<TString, TPrimitiveYdbOperations> ops; + // read and upsert rows into main table + ops[tableMeta->Name] = TPrimitiveYdbOperation::Read | TPrimitiveYdbOperation::Write; + + for (const auto& index : tableMeta->Indexes) { + if (!index.ItUsedForWrite()) { + continue; + } + + const auto indexTable = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, index.Name); + for (const auto& column : index.KeyColumns) { + if (updateColumns.contains(column)) { + // delete old index values and upsert rows into index table + ops[indexTable] = TPrimitiveYdbOperation::Write; + break; + } + } + + for (const auto& column : index.DataColumns) { + if (updateColumns.contains(column)) { + // upsert rows into index table + ops[indexTable] = TPrimitiveYdbOperation::Write; + break; + } + } + } + + AddEffect(effect, ops); + } + + void AddEffect(const TExprBase& effect, const THashMap<TString, TPrimitiveYdbOperations>& ops) { + bool uncommittedChangesRead = false; + for (const auto& [table, op] : ops) { + auto readOp = op & TPrimitiveYdbOperation::Read; + if (readOp && HasWriteOps(table)) { + uncommittedChangesRead = true; + break; + } + } if (QueryBlocks.empty() || uncommittedChangesRead) { AddQueryBlock(); @@ -107,8 +220,11 @@ struct TKiExploreTxResults { auto& curBlock = QueryBlocks.back(); curBlock.Effects.push_back(effect); curBlock.HasUncommittedChangesRead = uncommittedChangesRead; - auto& currentOps = curBlock.TableOperations[table]; - currentOps |= op; + + for (const auto& [table, op] : ops) { + auto& currentOps = curBlock.TableOperations[table]; + currentOps |= op; + } } void AddResult(const TExprBase& result) { @@ -120,14 +236,14 @@ struct TKiExploreTxResults { curBlock.Results.push_back(result); } - bool HasModifyOps(std::string_view table) { + bool HasWriteOps(std::string_view table) { if (QueryBlocks.empty()) { return false; } auto& curBlock = QueryBlocks.back(); auto currentOps = curBlock.TableOperations[table]; - return currentOps & KikimrModifyOps(); + return currentOps & TPrimitiveYdbOperation::Write; } void AddQueryBlock() { @@ -144,7 +260,9 @@ struct TKiExploreTxResults { : HasExecute(false) {} }; -bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes) { +bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes, + TIntrusivePtr<TKikimrTablesData> tablesData) { + if (txRes.Ops.cend() != txRes.Ops.find(node.Raw())) { return true; } @@ -156,7 +274,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T if (auto maybeLeft = node.Maybe<TCoLeft>()) { txRes.Ops.insert(node.Raw()); - return ExploreTx(maybeLeft.Cast().Input(), ctx, dataSink, txRes); + return ExploreTx(maybeLeft.Cast().Input(), ctx, dataSink, txRes, tablesData); } auto checkDataSource = [dataSink] (const TKiDataSource& ds) { @@ -180,13 +298,16 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T YQL_ENSURE(key.GetKeyType() == TKikimrKey::Type::Table); auto table = key.GetTablePath(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes); + auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx)); - if (txRes.HasModifyOps(table)) { - txRes.AddQueryBlock(); - txRes.SetBlockHasUncommittedChangesRead(); - } + + YQL_ENSURE(tablesData); + const auto& tableData = tablesData->ExistingTable(cluster, table); + YQL_ENSURE(tableData.Metadata); + auto readColumns = read.GetSelectColumns(ctx, tableData); + txRes.AddReadOpToQueryBlock(key, readColumns, tableData.Metadata); + return result; } @@ -198,10 +319,27 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto table = write.Table().Value(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(write.World(), ctx, dataSink, txRes); + auto result = ExploreTx(write.World(), ctx, dataSink, txRes, tablesData); auto tableOp = GetTableOp(write); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx)); - txRes.AddEffect(node, tableOp, table); + + YQL_ENSURE(tablesData); + const auto& tableData = tablesData->ExistingTable(cluster, table); + YQL_ENSURE(tableData.Metadata); + + if (tableOp == TYdbOperation::UpdateOn) { + auto inputColumnsSetting = GetSetting(write.Settings().Ref(), "input_columns"); + YQL_ENSURE(inputColumnsSetting); + auto inputColumns = TCoNameValueTuple(inputColumnsSetting).Value().Cast<TCoAtomList>(); + THashSet<std::string_view> updateColumns; + for (const auto& column : inputColumns) { + updateColumns.emplace(column); + } + txRes.AddUpdateOpToQueryBlock(node, tableData.Metadata, updateColumns); + } else { + txRes.AddWriteOpToQueryBlock(node, tableData.Metadata, tableOp & KikimrReadOps()); + } + return result; } @@ -213,9 +351,21 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto table = update.Table().Value(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(update.World(), ctx, dataSink, txRes); - txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Update, update.Pos(), ctx)); - txRes.AddEffect(node, TYdbOperation::Update, table); + auto result = ExploreTx(update.World(), ctx, dataSink, txRes, tablesData); + const auto tableOp = TYdbOperation::Update; + txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, update.Pos(), ctx)); + + YQL_ENSURE(tablesData); + const auto& tableData = tablesData->ExistingTable(cluster, table); + YQL_ENSURE(tableData.Metadata); + + THashSet<std::string_view> updateColumns; + const auto& updateStructType = update.Update().Ref().GetTypeAnn()->Cast<TStructExprType>(); + for (const auto& item : updateStructType->GetItems()) { + updateColumns.emplace(item->GetName()); + } + txRes.AddUpdateOpToQueryBlock(node, tableData.Metadata, updateColumns); + return result; } @@ -227,9 +377,15 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto table = del.Table().Value(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(del.World(), ctx, dataSink, txRes); - txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Delete, del.Pos(), ctx)); - txRes.AddEffect(node, TYdbOperation::Delete, table); + auto result = ExploreTx(del.World(), ctx, dataSink, txRes, tablesData); + const auto tableOp = TYdbOperation::Delete; + txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, del.Pos(), ctx)); + + YQL_ENSURE(tablesData); + const auto& tableData = tablesData->ExistingTable(cluster, table); + YQL_ENSURE(tableData.Metadata); + txRes.AddWriteOpToQueryBlock(node, tableData.Metadata, tableOp & KikimrReadOps()); + return result; } @@ -241,7 +397,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto table = create.Table().Value(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(create.World(), ctx, dataSink, txRes); + auto result = ExploreTx(create.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::CreateTable, create.Pos(), ctx)); return result; } @@ -254,7 +410,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto table = drop.Table().Value(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(drop.World(), ctx, dataSink, txRes); + auto result = ExploreTx(drop.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::DropTable, drop.Pos(), ctx)); return result; } @@ -267,7 +423,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T auto table = alter.Table().Value(); txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(alter.World(), ctx, dataSink, txRes); + auto result = ExploreTx(alter.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::AlterTable, alter.Pos(), ctx)); return result; } @@ -279,7 +435,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(createUser.World(), ctx, dataSink, txRes); + auto result = ExploreTx(createUser.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::CreateUser, createUser.Pos(), ctx)); return result; } @@ -291,7 +447,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(alterUser.World(), ctx, dataSink, txRes); + auto result = ExploreTx(alterUser.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::AlterUser, alterUser.Pos(), ctx)); return result; } @@ -303,7 +459,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(dropUser.World(), ctx, dataSink, txRes); + auto result = ExploreTx(dropUser.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::DropUser, dropUser.Pos(), ctx)); return result; } @@ -315,7 +471,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(createGroup.World(), ctx, dataSink, txRes); + auto result = ExploreTx(createGroup.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::CreateGroup, createGroup.Pos(), ctx)); return result; } @@ -327,7 +483,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(alterGroup.World(), ctx, dataSink, txRes); + auto result = ExploreTx(alterGroup.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::AlterGroup, alterGroup.Pos(), ctx)); return result; } @@ -339,7 +495,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } txRes.Ops.insert(node.Raw()); - auto result = ExploreTx(dropGroup.World(), ctx, dataSink, txRes); + auto result = ExploreTx(dropGroup.World(), ctx, dataSink, txRes, tablesData); txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::DropGroup, dropGroup.Pos(), ctx)); return result; } @@ -362,13 +518,13 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T return true; } - return ExploreTx(commit.World(), ctx, dataSink, txRes); + return ExploreTx(commit.World(), ctx, dataSink, txRes, tablesData); } if (auto maybeSync = node.Maybe<TCoSync>()) { txRes.Ops.insert(node.Raw()); for (auto child : maybeSync.Cast()) { - if (!ExploreTx(child, ctx, dataSink, txRes)) { + if (!ExploreTx(child, ctx, dataSink, txRes, tablesData)) { return false; } @@ -380,7 +536,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T node.Maybe<TResPull>()) { txRes.Ops.insert(node.Raw()); - bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes); + bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes, tablesData); txRes.AddResult(node); return result; } @@ -534,7 +690,7 @@ TKiDataQuery MakeKiDataQuery(TExprBase node, const TKiExploreTxResults& txExplor } // namespace -TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx) { +TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData) { if (!node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) { return node.Ptr(); } @@ -544,7 +700,7 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx) { auto kiDataSink = commit.DataSink().Cast<TKiDataSink>(); TKiExploreTxResults txExplore; - if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore)) { + if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData)) { return node.Ptr(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h index e989605dac..d1d8ba2139 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h @@ -174,7 +174,7 @@ void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperat TVector<NKqpProto::TKqpTableInfo>& infos); // Optimizer rules -TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx); +TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData); TExprNode::TPtr KiBuildResult(NNodes::TExprBase node, const TString& cluster, TExprContext& ctx); const THashSet<TStringBuf>& KikimrDataSourceFunctions(); diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp index 5f39fefbe3..9fa4704b1d 100644 --- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp @@ -673,6 +673,215 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) { [[4u];["Four"]] ])", FormatResultSetYson(result.GetResultSet(1))); } + + Y_UNIT_TEST(UpsertAfterInsert) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateTestTable(session); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three"); + UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "NewValue3"); + + SELECT * FROM TestImmediateEffects; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]]; + [[3u];["NewValue3"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + Y_UNIT_TEST(UpsertAfterInsertWithIndex) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateSampleTablesWithIndex(session); + + { // secondary key + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO SecondaryKeys (Key, Fk, Value) VALUES + (6u, 6u, "Payload6"); + + UPSERT INTO SecondaryKeys (Key, Fk, Value) VALUES + (6u, 60u, "Payload60"); + + SELECT * FROM SecondaryKeys VIEW Index; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#;#;["Payload8"]]; + [#;[7];["Payload7"]]; + [[1];[1];["Payload1"]]; + [[2];[2];["Payload2"]]; + [[5];[5];["Payload5"]]; + [[60];[6];["Payload60"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + { // secondary complex keys + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO SecondaryComplexKeys (Key, Fk1, Fk2, Value) VALUES + (8u, 8u, "Fk8", "Payload8"); + + UPSERT INTO SecondaryComplexKeys (Key, Fk1, Fk2) VALUES + (8u, 8u, "Fk9"); + + SELECT * FROM SecondaryComplexKeys VIEW Index; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#;#;#;["Payload8"]]; + [#;["Fk7"];[7];["Payload7"]]; + [[1];["Fk1"];[1];["Payload1"]]; + [[2];["Fk2"];[2];["Payload2"]]; + [[5];["Fk5"];[5];["Payload5"]]; + [[8];["Fk9"];[8];["Payload8"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + + { // secondary index with data column + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + INSERT INTO SecondaryWithDataColumns (Key, Index2, Value) VALUES + ("Primary2", "Secondary2", "Value2"); + + UPSERT INTO SecondaryWithDataColumns (Key, Index2, Value) VALUES + ("Primary2", "Secondary22", "Value22"); + + SELECT * FROM SecondaryWithDataColumns VIEW Index; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#;["Secondary1"];["Primary1"];["Value1"]]; + [#;["Secondary22"];["Primary2"];["Value22"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + } + } + + Y_UNIT_TEST(DeleteOnAfterInsertWithIndex) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateSampleTablesWithIndex(session); + + { // secondary key + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM SecondaryKeys; + + INSERT INTO SecondaryKeys (Key, Fk, Value) VALUES + (6u, 6u, "Payload6"); + + DELETE FROM SecondaryKeys ON (Key) VALUES (6u); + + SELECT * FROM SecondaryKeys; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1))); + } + + { // secondary complex keys + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM SecondaryComplexKeys VIEW Index; + + INSERT INTO SecondaryComplexKeys (Key, Fk1, Fk2, Value) VALUES + (8u, 8u, "Fk8", "Payload8"); + + DELETE FROM SecondaryComplexKeys ON (Key) VALUES (8u); + + SELECT * FROM SecondaryComplexKeys VIEW Index; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1))); + } + + { // secondary index with data column + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM SecondaryWithDataColumns VIEW Index; + + INSERT INTO SecondaryWithDataColumns (Key, Index2, Value) VALUES + ("Primary2", "Secondary2", "Value2"); + + DELETE FROM SecondaryWithDataColumns ON (Key) VALUES ("Primary2"); + + SELECT * FROM SecondaryWithDataColumns VIEW Index; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1))); + } + } + + Y_UNIT_TEST(MultipleEffectsWithIndex) { + auto serverSettings = TKikimrSettings() + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpImmediateEffects(true); + TKikimrRunner kikimr(serverSettings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateSampleTablesWithIndex(session); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM SecondaryKeys VIEW Index; + + INSERT INTO SecondaryKeys (Key, Fk, Value) VALUES + (10u, 10u, "Payload10"); + + UPSERT INTO SecondaryKeys (Key, Fk, Value) VALUES + (20u, 20u, "Payload20"); + + SELECT * FROM SecondaryKeys VIEW Index; + + UPDATE SecondaryKeys ON (Key, Fk) VALUES + (20u, 21u); + + UPDATE SecondaryKeys SET Fk = 20u WHERE Key = 20u; + + SELECT * FROM SecondaryKeys VIEW Index; + + DELETE FROM SecondaryKeys ON (Key) VALUES (20u); + + DELETE FROM SecondaryKeys ON (Key) VALUES (10u); + + SELECT * FROM SecondaryKeys VIEW Index; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(3))); + CompareYson(FormatResultSetYson(result.GetResultSet(1)), FormatResultSetYson(result.GetResultSet(2))); + } } } // namespace NKqp |