summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp94
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h55
2 files changed, 133 insertions, 16 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
index 61f4e02f688..d09a196b7af 100644
--- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
@@ -466,7 +466,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
}
}
- Y_UNIT_TEST(AlterTableAddNotNullWithDefaultIndexed) {
+ Y_UNIT_TEST(IndexedTableAndNotNullColumn) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableSequences(false);
@@ -583,6 +583,98 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
}
+ Y_UNIT_TEST(IndexedTableAndNotNullColumnAddNotNullColumn) {
+
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableSequences(false);
+ appConfig.MutableTableServiceConfig()->SetEnableColumnsWithDefault(true);
+ appConfig.MutableFeatureFlags()->SetEnableAddColumsWithDefaults(true);
+
+ TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()).SetAppConfig(appConfig));
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/AlterTableAddNotNullColumn` (
+ Key Uint32,
+ Value String,
+ Value2 Int32 NOT NULL DEFAULT 1,
+ PRIMARY KEY (Key),
+ INDEX ByValue GLOBAL ON (Value) COVER (Value2)
+ );
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS,
+ result.GetIssues().ToString());
+ }
+
+ auto fQuery = [&](TString query) -> TString {
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.KeepInQueryCache(true);
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
+
+ auto result =
+ session
+ .ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(),
+ execSettings)
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS,
+ result.GetIssues().ToString());
+ if (result.GetResultSets().size() > 0)
+ return NYdb::FormatResultSetYson(result.GetResultSet(0));
+ return "";
+ };
+
+ fQuery(R"(
+ UPSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (1, "Old");
+ )");
+
+ auto fCompareTable = [&](TString expected) {
+ TString query = R"(
+ SELECT * FROM `/Root/AlterTableAddNotNullColumn` ORDER BY Key;
+ )";
+ CompareYson(expected, fQuery(query));
+ };
+
+ fCompareTable(R"(
+ [
+ [[1u];["Old"];1]
+ ]
+ )");
+
+ fQuery(R"(
+ INSERT INTO `/Root/AlterTableAddNotNullColumn` (Key, Value) VALUES (2, "New");
+ )");
+
+ fCompareTable(R"(
+ [
+ [[1u];["Old"];1];[[2u];["New"];1]
+ ]
+ )");
+
+ {
+ auto query = R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/AlterTableAddNotNullColumn` ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7;
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS,
+ result.GetIssues().ToString());
+ }
+
+ fCompareTable(R"(
+ [
+ [[1u];["Old"];1;7];[[2u];["New"];1;7]
+ ]
+ )");
+
+ }
+
Y_UNIT_TEST(AlterTableAddNotNullWithDefault) {
NKikimrConfig::TAppConfig appConfig;
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 0765fe1a29f..bccb959d30c 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -371,21 +371,6 @@ private:
auto& entry = ResolveNamesResult->ResultSet.front();
- for (const auto& index : entry.Indexes) {
- switch (index.GetType()) {
- case NKikimrSchemeOp::EIndexTypeGlobalAsync:
- if (AppData(ctx)->FeatureFlags.GetEnableBulkUpsertToAsyncIndexedTables()) {
- continue;
- } else {
- errorMessage = "Bulk upsert is not supported for tables with indexes";
- return false;
- }
- default:
- errorMessage = "Only async-indexed tables are supported by BulkUpsert";
- return false;
- }
- }
-
TVector<ui32> keyColumnIds;
THashMap<TString, ui32> columnByName;
THashSet<TString> keyColumnsLeft;
@@ -517,6 +502,40 @@ private:
}
}
+ std::unordered_set<std::string_view> UpdatingValueColumns;
+ if (UpsertIfExists) {
+ for(const auto& name: ValueColumnNames) {
+ UpdatingValueColumns.emplace(name);
+ }
+ }
+
+ for (const auto& index : entry.Indexes) {
+ if (index.GetType() == NKikimrSchemeOp::EIndexTypeGlobalAsync &&
+ AppData(ctx)->FeatureFlags.GetEnableBulkUpsertToAsyncIndexedTables()) {
+ continue;
+ }
+
+ bool allowUpdate = UpsertIfExists;
+ for(auto& column : index.GetKeyColumnNames()) {
+ allowUpdate &= (UpdatingValueColumns.find(column) == UpdatingValueColumns.end());
+ if (!allowUpdate) {
+ break;
+ }
+ }
+
+ for(auto& column : index.GetDataColumnNames()) {
+ allowUpdate &= (UpdatingValueColumns.find(column) == UpdatingValueColumns.end());
+ if (!allowUpdate) {
+ break;
+ }
+ }
+
+ if (!allowUpdate) {
+ errorMessage = "Only async-indexed tables are supported by BulkUpsert";
+ return false;
+ }
+ }
+
if (makeYqbSchema) {
Id2Position.clear();
YdbSchema.resize(KeyColumnTypes.size() + ValueColumnTypes.size());
@@ -545,6 +564,12 @@ private:
return false;
}
+ if (!notNullColumnsLeft.empty() && UpsertIfExists) {
+ // columns are not specified but upsert is executed in update mode
+ // and we will not change these not null columns.
+ notNullColumnsLeft.clear();
+ }
+
if (!notNullColumnsLeft.empty()) {
errorMessage = Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str());
return false;