aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2022-12-14 14:53:20 +0300
committerulya-sidorina <yulia@ydb.tech>2022-12-14 14:53:20 +0300
commit6aec14798ad91ed132f3da681c3d5b9c6fb2240d (patch)
treec5b223b8a792ec0e7938b7c676cd03acea5eec70
parent7b2317ba8baa3414534e276c89e74f3033be9f69 (diff)
downloadydb-6aec14798ad91ed132f3da681c3d5b9c6fb2240d.tar.gz
Support immediate effects for secondary indexes
feature(kqp): support immediate effects for secondary indexes
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt.cpp2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp230
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider_impl.h2
-rw-r--r--ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp209
4 files changed, 404 insertions, 39 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt.cpp b/ydb/core/kqp/provider/yql_kikimr_opt.cpp
index f38393e64a..ca3dfce32e 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt.cpp
@@ -75,7 +75,7 @@ TAutoPtr<IGraphTransformer> CreateKiPhysicalOptProposalTransformer(TIntrusivePtr
if (auto maybeDatasink = node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) {
auto cluster = TString(maybeDatasink.Cast().Cluster());
- ret = KiBuildQuery(node, ctx);
+ ret = KiBuildQuery(node, ctx, sessionCtx->TablesPtr());
if (ret != inputNode) {
return ret;
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 86d4aab743..983cebbb7c 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -63,11 +63,19 @@ ui64 GetResultRowsLimit(const TResWriteBase& resWrite) {
return 0;
}
+enum class TPrimitiveYdbOperation : ui32 {
+ Read = 1 << 0,
+ Write = 1 << 1
+};
+
+Y_DECLARE_FLAGS(TPrimitiveYdbOperations, TPrimitiveYdbOperation)
+Y_DECLARE_OPERATORS_FOR_FLAGS(TPrimitiveYdbOperations)
+
struct TKiExploreTxResults {
struct TKiQueryBlock {
TVector<TExprBase> Results;
TVector<TExprBase> Effects;
- THashMap<TString, TYdbOperations> TableOperations;
+ THashMap<TString, TPrimitiveYdbOperations> TableOperations;
bool HasUncommittedChangesRead = false;
};
@@ -96,9 +104,114 @@ struct TKiExploreTxResults {
}
}
- void AddEffect(const TExprBase& effect, TYdbOperation op, std::string_view table) {
- auto readOp = op & KikimrReadOps();
- auto uncommittedChangesRead = HasModifyOps(table) && readOp;
+ void AddReadOpToQueryBlock(const TKikimrKey& key, const TCoAtomList& readColumns, TKikimrTableMetadataPtr tableMeta) {
+ YQL_ENSURE(tableMeta, "Empty table metadata");
+
+ bool uncommittedChangesRead = false;
+ if (key.GetView()) {
+ const auto& indexName = key.GetView().GetRef();
+ const auto indexTablePath = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, indexName);
+
+ auto indexIt = std::find_if(tableMeta->Indexes.begin(), tableMeta->Indexes.end(), [&indexName](const auto& index){
+ return index.Name == indexName;
+ });
+ YQL_ENSURE(indexIt != tableMeta->Indexes.end(), "Index not found");
+
+ THashSet<TString> indexColumns;
+ indexColumns.reserve(indexIt->KeyColumns.size() + indexIt->DataColumns.size());
+ for (const auto& keyColumn : indexIt->KeyColumns) {
+ indexColumns.insert(keyColumn);
+ }
+
+ for (const auto& column : indexIt->DataColumns) {
+ indexColumns.insert(column);
+ }
+
+ bool needMainTableRead = false;
+ for (const auto& col : readColumns) {
+ if (!indexColumns.contains(col.StringValue())) {
+ needMainTableRead = true;
+ break;
+ }
+ }
+
+ uncommittedChangesRead = HasWriteOps(indexTablePath) || (needMainTableRead && HasWriteOps(tableMeta->Name));
+ } else {
+ uncommittedChangesRead = HasWriteOps(tableMeta->Name);
+ }
+
+ if (uncommittedChangesRead) {
+ AddQueryBlock();
+ SetBlockHasUncommittedChangesRead();
+ }
+ }
+
+ void AddWriteOpToQueryBlock(const TExprBase& effect, TKikimrTableMetadataPtr tableMeta, bool needMainTableRead) {
+ YQL_ENSURE(tableMeta, "Empty table metadata");
+
+ THashMap<TString, TPrimitiveYdbOperations> ops;
+ if (needMainTableRead) {
+ ops[tableMeta->Name] |= TPrimitiveYdbOperation::Read;
+ }
+ ops[tableMeta->Name] |= TPrimitiveYdbOperation::Write;
+
+ for (const auto& index : tableMeta->Indexes) {
+ if (!index.ItUsedForWrite()) {
+ continue;
+ }
+
+ const auto indexTable = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, index.Name);
+
+ ops[tableMeta->Name] |= TPrimitiveYdbOperation::Read;
+ ops[indexTable] = TPrimitiveYdbOperation::Write;
+ }
+
+ AddEffect(effect, ops);
+ }
+
+ void AddUpdateOpToQueryBlock(const TExprBase& effect, TKikimrTableMetadataPtr tableMeta,
+ const THashSet<std::string_view>& updateColumns) {
+ YQL_ENSURE(tableMeta, "Empty table metadata");
+
+ THashMap<TString, TPrimitiveYdbOperations> ops;
+ // read and upsert rows into main table
+ ops[tableMeta->Name] = TPrimitiveYdbOperation::Read | TPrimitiveYdbOperation::Write;
+
+ for (const auto& index : tableMeta->Indexes) {
+ if (!index.ItUsedForWrite()) {
+ continue;
+ }
+
+ const auto indexTable = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, index.Name);
+ for (const auto& column : index.KeyColumns) {
+ if (updateColumns.contains(column)) {
+ // delete old index values and upsert rows into index table
+ ops[indexTable] = TPrimitiveYdbOperation::Write;
+ break;
+ }
+ }
+
+ for (const auto& column : index.DataColumns) {
+ if (updateColumns.contains(column)) {
+ // upsert rows into index table
+ ops[indexTable] = TPrimitiveYdbOperation::Write;
+ break;
+ }
+ }
+ }
+
+ AddEffect(effect, ops);
+ }
+
+ void AddEffect(const TExprBase& effect, const THashMap<TString, TPrimitiveYdbOperations>& ops) {
+ bool uncommittedChangesRead = false;
+ for (const auto& [table, op] : ops) {
+ auto readOp = op & TPrimitiveYdbOperation::Read;
+ if (readOp && HasWriteOps(table)) {
+ uncommittedChangesRead = true;
+ break;
+ }
+ }
if (QueryBlocks.empty() || uncommittedChangesRead) {
AddQueryBlock();
@@ -107,8 +220,11 @@ struct TKiExploreTxResults {
auto& curBlock = QueryBlocks.back();
curBlock.Effects.push_back(effect);
curBlock.HasUncommittedChangesRead = uncommittedChangesRead;
- auto& currentOps = curBlock.TableOperations[table];
- currentOps |= op;
+
+ for (const auto& [table, op] : ops) {
+ auto& currentOps = curBlock.TableOperations[table];
+ currentOps |= op;
+ }
}
void AddResult(const TExprBase& result) {
@@ -120,14 +236,14 @@ struct TKiExploreTxResults {
curBlock.Results.push_back(result);
}
- bool HasModifyOps(std::string_view table) {
+ bool HasWriteOps(std::string_view table) {
if (QueryBlocks.empty()) {
return false;
}
auto& curBlock = QueryBlocks.back();
auto currentOps = curBlock.TableOperations[table];
- return currentOps & KikimrModifyOps();
+ return currentOps & TPrimitiveYdbOperation::Write;
}
void AddQueryBlock() {
@@ -144,7 +260,9 @@ struct TKiExploreTxResults {
: HasExecute(false) {}
};
-bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes) {
+bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, TKiExploreTxResults& txRes,
+ TIntrusivePtr<TKikimrTablesData> tablesData) {
+
if (txRes.Ops.cend() != txRes.Ops.find(node.Raw())) {
return true;
}
@@ -156,7 +274,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
if (auto maybeLeft = node.Maybe<TCoLeft>()) {
txRes.Ops.insert(node.Raw());
- return ExploreTx(maybeLeft.Cast().Input(), ctx, dataSink, txRes);
+ return ExploreTx(maybeLeft.Cast().Input(), ctx, dataSink, txRes, tablesData);
}
auto checkDataSource = [dataSink] (const TKiDataSource& ds) {
@@ -180,13 +298,16 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
YQL_ENSURE(key.GetKeyType() == TKikimrKey::Type::Table);
auto table = key.GetTablePath();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(maybeRead.Cast().World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Select, read.Pos(), ctx));
- if (txRes.HasModifyOps(table)) {
- txRes.AddQueryBlock();
- txRes.SetBlockHasUncommittedChangesRead();
- }
+
+ YQL_ENSURE(tablesData);
+ const auto& tableData = tablesData->ExistingTable(cluster, table);
+ YQL_ENSURE(tableData.Metadata);
+ auto readColumns = read.GetSelectColumns(ctx, tableData);
+ txRes.AddReadOpToQueryBlock(key, readColumns, tableData.Metadata);
+
return result;
}
@@ -198,10 +319,27 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto table = write.Table().Value();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(write.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(write.World(), ctx, dataSink, txRes, tablesData);
auto tableOp = GetTableOp(write);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, write.Pos(), ctx));
- txRes.AddEffect(node, tableOp, table);
+
+ YQL_ENSURE(tablesData);
+ const auto& tableData = tablesData->ExistingTable(cluster, table);
+ YQL_ENSURE(tableData.Metadata);
+
+ if (tableOp == TYdbOperation::UpdateOn) {
+ auto inputColumnsSetting = GetSetting(write.Settings().Ref(), "input_columns");
+ YQL_ENSURE(inputColumnsSetting);
+ auto inputColumns = TCoNameValueTuple(inputColumnsSetting).Value().Cast<TCoAtomList>();
+ THashSet<std::string_view> updateColumns;
+ for (const auto& column : inputColumns) {
+ updateColumns.emplace(column);
+ }
+ txRes.AddUpdateOpToQueryBlock(node, tableData.Metadata, updateColumns);
+ } else {
+ txRes.AddWriteOpToQueryBlock(node, tableData.Metadata, tableOp & KikimrReadOps());
+ }
+
return result;
}
@@ -213,9 +351,21 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto table = update.Table().Value();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(update.World(), ctx, dataSink, txRes);
- txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Update, update.Pos(), ctx));
- txRes.AddEffect(node, TYdbOperation::Update, table);
+ auto result = ExploreTx(update.World(), ctx, dataSink, txRes, tablesData);
+ const auto tableOp = TYdbOperation::Update;
+ txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, update.Pos(), ctx));
+
+ YQL_ENSURE(tablesData);
+ const auto& tableData = tablesData->ExistingTable(cluster, table);
+ YQL_ENSURE(tableData.Metadata);
+
+ THashSet<std::string_view> updateColumns;
+ const auto& updateStructType = update.Update().Ref().GetTypeAnn()->Cast<TStructExprType>();
+ for (const auto& item : updateStructType->GetItems()) {
+ updateColumns.emplace(item->GetName());
+ }
+ txRes.AddUpdateOpToQueryBlock(node, tableData.Metadata, updateColumns);
+
return result;
}
@@ -227,9 +377,15 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto table = del.Table().Value();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(del.World(), ctx, dataSink, txRes);
- txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::Delete, del.Pos(), ctx));
- txRes.AddEffect(node, TYdbOperation::Delete, table);
+ auto result = ExploreTx(del.World(), ctx, dataSink, txRes, tablesData);
+ const auto tableOp = TYdbOperation::Delete;
+ txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, tableOp, del.Pos(), ctx));
+
+ YQL_ENSURE(tablesData);
+ const auto& tableData = tablesData->ExistingTable(cluster, table);
+ YQL_ENSURE(tableData.Metadata);
+ txRes.AddWriteOpToQueryBlock(node, tableData.Metadata, tableOp & KikimrReadOps());
+
return result;
}
@@ -241,7 +397,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto table = create.Table().Value();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(create.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(create.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::CreateTable, create.Pos(), ctx));
return result;
}
@@ -254,7 +410,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto table = drop.Table().Value();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(drop.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(drop.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::DropTable, drop.Pos(), ctx));
return result;
}
@@ -267,7 +423,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
auto table = alter.Table().Value();
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(alter.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(alter.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildTableOpNode(cluster, table, TYdbOperation::AlterTable, alter.Pos(), ctx));
return result;
}
@@ -279,7 +435,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(createUser.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(createUser.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::CreateUser, createUser.Pos(), ctx));
return result;
}
@@ -291,7 +447,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(alterUser.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(alterUser.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::AlterUser, alterUser.Pos(), ctx));
return result;
}
@@ -303,7 +459,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(dropUser.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(dropUser.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::DropUser, dropUser.Pos(), ctx));
return result;
}
@@ -315,7 +471,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(createGroup.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(createGroup.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::CreateGroup, createGroup.Pos(), ctx));
return result;
}
@@ -327,7 +483,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(alterGroup.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(alterGroup.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::AlterGroup, alterGroup.Pos(), ctx));
return result;
}
@@ -339,7 +495,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
txRes.Ops.insert(node.Raw());
- auto result = ExploreTx(dropGroup.World(), ctx, dataSink, txRes);
+ auto result = ExploreTx(dropGroup.World(), ctx, dataSink, txRes, tablesData);
txRes.TableOperations.push_back(BuildYdbOpNode(cluster, TYdbOperation::DropGroup, dropGroup.Pos(), ctx));
return result;
}
@@ -362,13 +518,13 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
return true;
}
- return ExploreTx(commit.World(), ctx, dataSink, txRes);
+ return ExploreTx(commit.World(), ctx, dataSink, txRes, tablesData);
}
if (auto maybeSync = node.Maybe<TCoSync>()) {
txRes.Ops.insert(node.Raw());
for (auto child : maybeSync.Cast()) {
- if (!ExploreTx(child, ctx, dataSink, txRes)) {
+ if (!ExploreTx(child, ctx, dataSink, txRes, tablesData)) {
return false;
}
@@ -380,7 +536,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
node.Maybe<TResPull>())
{
txRes.Ops.insert(node.Raw());
- bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes);
+ bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes, tablesData);
txRes.AddResult(node);
return result;
}
@@ -534,7 +690,7 @@ TKiDataQuery MakeKiDataQuery(TExprBase node, const TKiExploreTxResults& txExplor
} // namespace
-TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx) {
+TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData) {
if (!node.Maybe<TCoCommit>().DataSink().Maybe<TKiDataSink>()) {
return node.Ptr();
}
@@ -544,7 +700,7 @@ TExprNode::TPtr KiBuildQuery(TExprBase node, TExprContext& ctx) {
auto kiDataSink = commit.DataSink().Cast<TKiDataSink>();
TKiExploreTxResults txExplore;
- if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore)) {
+ if (!ExploreTx(commit.World(), ctx, kiDataSink, txExplore, tablesData)) {
return node.Ptr();
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
index e989605dac..d1d8ba2139 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider_impl.h
@@ -174,7 +174,7 @@ void TableDescriptionToTableInfo(const TKikimrTableDescription& desc, TYdbOperat
TVector<NKqpProto::TKqpTableInfo>& infos);
// Optimizer rules
-TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx);
+TExprNode::TPtr KiBuildQuery(NNodes::TExprBase node, TExprContext& ctx, TIntrusivePtr<TKikimrTablesData> tablesData);
TExprNode::TPtr KiBuildResult(NNodes::TExprBase node, const TString& cluster, TExprContext& ctx);
const THashSet<TStringBuf>& KikimrDataSourceFunctions();
diff --git a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
index 5f39fefbe3..9fa4704b1d 100644
--- a/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
+++ b/ydb/core/kqp/ut/effects/kqp_immediate_effects_ut.cpp
@@ -673,6 +673,215 @@ Y_UNIT_TEST_SUITE(KqpImmediateEffects) {
[[4u];["Four"]]
])", FormatResultSetYson(result.GetResultSet(1)));
}
+
+ Y_UNIT_TEST(UpsertAfterInsert) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateTestTable(session);
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "Three");
+ UPSERT INTO TestImmediateEffects (Key, Value) VALUES (3u, "NewValue3");
+
+ SELECT * FROM TestImmediateEffects;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]];
+ [[3u];["NewValue3"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ Y_UNIT_TEST(UpsertAfterInsertWithIndex) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateSampleTablesWithIndex(session);
+
+ { // secondary key
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO SecondaryKeys (Key, Fk, Value) VALUES
+ (6u, 6u, "Payload6");
+
+ UPSERT INTO SecondaryKeys (Key, Fk, Value) VALUES
+ (6u, 60u, "Payload60");
+
+ SELECT * FROM SecondaryKeys VIEW Index;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [#;#;["Payload8"]];
+ [#;[7];["Payload7"]];
+ [[1];[1];["Payload1"]];
+ [[2];[2];["Payload2"]];
+ [[5];[5];["Payload5"]];
+ [[60];[6];["Payload60"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ { // secondary complex keys
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO SecondaryComplexKeys (Key, Fk1, Fk2, Value) VALUES
+ (8u, 8u, "Fk8", "Payload8");
+
+ UPSERT INTO SecondaryComplexKeys (Key, Fk1, Fk2) VALUES
+ (8u, 8u, "Fk9");
+
+ SELECT * FROM SecondaryComplexKeys VIEW Index;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [#;#;#;["Payload8"]];
+ [#;["Fk7"];[7];["Payload7"]];
+ [[1];["Fk1"];[1];["Payload1"]];
+ [[2];["Fk2"];[2];["Payload2"]];
+ [[5];["Fk5"];[5];["Payload5"]];
+ [[8];["Fk9"];[8];["Payload8"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+
+ { // secondary index with data column
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ INSERT INTO SecondaryWithDataColumns (Key, Index2, Value) VALUES
+ ("Primary2", "Secondary2", "Value2");
+
+ UPSERT INTO SecondaryWithDataColumns (Key, Index2, Value) VALUES
+ ("Primary2", "Secondary22", "Value22");
+
+ SELECT * FROM SecondaryWithDataColumns VIEW Index;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [#;["Secondary1"];["Primary1"];["Value1"]];
+ [#;["Secondary22"];["Primary2"];["Value22"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
+ }
+
+ Y_UNIT_TEST(DeleteOnAfterInsertWithIndex) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateSampleTablesWithIndex(session);
+
+ { // secondary key
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM SecondaryKeys;
+
+ INSERT INTO SecondaryKeys (Key, Fk, Value) VALUES
+ (6u, 6u, "Payload6");
+
+ DELETE FROM SecondaryKeys ON (Key) VALUES (6u);
+
+ SELECT * FROM SecondaryKeys;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ { // secondary complex keys
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM SecondaryComplexKeys VIEW Index;
+
+ INSERT INTO SecondaryComplexKeys (Key, Fk1, Fk2, Value) VALUES
+ (8u, 8u, "Fk8", "Payload8");
+
+ DELETE FROM SecondaryComplexKeys ON (Key) VALUES (8u);
+
+ SELECT * FROM SecondaryComplexKeys VIEW Index;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1)));
+ }
+
+ { // secondary index with data column
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM SecondaryWithDataColumns VIEW Index;
+
+ INSERT INTO SecondaryWithDataColumns (Key, Index2, Value) VALUES
+ ("Primary2", "Secondary2", "Value2");
+
+ DELETE FROM SecondaryWithDataColumns ON (Key) VALUES ("Primary2");
+
+ SELECT * FROM SecondaryWithDataColumns VIEW Index;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1)));
+ }
+ }
+
+ Y_UNIT_TEST(MultipleEffectsWithIndex) {
+ auto serverSettings = TKikimrSettings()
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpImmediateEffects(true);
+ TKikimrRunner kikimr(serverSettings);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ CreateSampleTablesWithIndex(session);
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM SecondaryKeys VIEW Index;
+
+ INSERT INTO SecondaryKeys (Key, Fk, Value) VALUES
+ (10u, 10u, "Payload10");
+
+ UPSERT INTO SecondaryKeys (Key, Fk, Value) VALUES
+ (20u, 20u, "Payload20");
+
+ SELECT * FROM SecondaryKeys VIEW Index;
+
+ UPDATE SecondaryKeys ON (Key, Fk) VALUES
+ (20u, 21u);
+
+ UPDATE SecondaryKeys SET Fk = 20u WHERE Key = 20u;
+
+ SELECT * FROM SecondaryKeys VIEW Index;
+
+ DELETE FROM SecondaryKeys ON (Key) VALUES (20u);
+
+ DELETE FROM SecondaryKeys ON (Key) VALUES (10u);
+
+ SELECT * FROM SecondaryKeys VIEW Index;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(3)));
+ CompareYson(FormatResultSetYson(result.GetResultSet(1)), FormatResultSetYson(result.GetResultSet(2)));
+ }
}
} // namespace NKqp