diff options
| -rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp | 94 | ||||
| -rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 55 |
2 files changed, 133 insertions, 16 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 61f4e02f688..d09a196b7af 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -466,7 +466,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { } } - Y_UNIT_TEST(AlterTableAddNotNullWithDefaultIndexed) { + Y_UNIT_TEST(IndexedTableAndNotNullColumn) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableSequences(false); @@ -583,6 +583,98 @@ Y_UNIT_TEST_SUITE(KqpConstraints) { } + Y_UNIT_TEST(IndexedTableAndNotNullColumnAddNotNullColumn) { + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableSequences(false); + appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true); + appConfig.MutableFeatureFlags()->SetEnableAddColumsWithDefaults(true); + + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()).SetAppConfig(appConfig)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/AlterTableAddNotNullColumn` ( + Key Uint32, + Value String, + Value2 Int32 NOT NULL DEFAULT 1, + PRIMARY KEY (Key), + INDEX ByValue GLOBAL ON (Value) COVER (Value2) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + } + + auto fQuery = [&](TString query) -> TString { + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.KeepInQueryCache(true); + execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + + auto result = + session + .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), + execSettings) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + if (result.GetResultSets().size() > 0) + return NYdb::FormatResultSetYson(result.GetResultSet(0)); + return ""; + }; + + fQuery(R"( + UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old"); + )"); + + auto fCompareTable = [&](TString expected) { + TString query = R"( + SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key; + )"; + CompareYson(expected, fQuery(query)); + }; + + fCompareTable(R"( + [ + [[1u];["Old"];1] + ] + )"); + + fQuery(R"( + INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New"); + )"); + + fCompareTable(R"( + [ + [[1u];["Old"];1];[[2u];["New"];1] + ] + )"); + + { + auto query = R"( + --!syntax_v1 + ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7; + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, + result.GetIssues().ToString()); + } + + fCompareTable(R"( + [ + [[1u];["Old"];1;7];[[2u];["New"];1;7] + ] + )"); + + } + Y_UNIT_TEST(AlterTableAddNotNullWithDefault) { NKikimrConfig::TAppConfig appConfig; diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 0765fe1a29f..bccb959d30c 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -371,21 +371,6 @@ private: auto& entry = ResolveNamesResult->ResultSet.front(); - for (const auto& index : entry.Indexes) { - switch (index.GetType()) { - case NKikimrSchemeOp::EIndexTypeGlobalAsync: - if (AppData(ctx)->FeatureFlags.GetEnableBulkUpsertToAsyncIndexedTables()) { - continue; - } else { - errorMessage = "Bulk upsert is not supported for tables with indexes"; - return false; - } - default: - errorMessage = "Only async-indexed tables are supported by BulkUpsert"; - return false; - } - } - TVector<ui32> keyColumnIds; THashMap<TString, ui32> columnByName; THashSet<TString> keyColumnsLeft; @@ -517,6 +502,40 @@ private: } } + std::unordered_set<std::string_view> UpdatingValueColumns; + if (UpsertIfExists) { + for(const auto& name: ValueColumnNames) { + UpdatingValueColumns.emplace(name); + } + } + + for (const auto& index : entry.Indexes) { + if (index.GetType() == NKikimrSchemeOp::EIndexTypeGlobalAsync && + AppData(ctx)->FeatureFlags.GetEnableBulkUpsertToAsyncIndexedTables()) { + continue; + } + + bool allowUpdate = UpsertIfExists; + for(auto& column : index.GetKeyColumnNames()) { + allowUpdate &= (UpdatingValueColumns.find(column) == UpdatingValueColumns.end()); + if (!allowUpdate) { + break; + } + } + + for(auto& column : index.GetDataColumnNames()) { + allowUpdate &= (UpdatingValueColumns.find(column) == UpdatingValueColumns.end()); + if (!allowUpdate) { + break; + } + } + + if (!allowUpdate) { + errorMessage = "Only async-indexed tables are supported by BulkUpsert"; + return false; + } + } + if (makeYqbSchema) { Id2Position.clear(); YdbSchema.resize(KeyColumnTypes.size() + ValueColumnTypes.size()); @@ -545,6 +564,12 @@ private: return false; } + if (!notNullColumnsLeft.empty() && UpsertIfExists) { + // columns are not specified but upsert is executed in update mode + // and we will not change these not null columns. + notNullColumnsLeft.clear(); + } + if (!notNullColumnsLeft.empty()) { errorMessage = Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()); return false; |
