diff options
| author | Daniil Timizhev <[email protected]> | 2025-10-25 03:43:24 +0300 | 
|---|---|---|
| committer | GitHub <[email protected]> | 2025-10-25 00:43:24 +0000 | 
| commit | ab5870365b8c9a298dbe9b0dc9a71035a25f3898 (patch) | |
| tree | 00db4d69d171ae477cb718f7e37de63523543a05 | |
| parent | cf7b87df54ae9bf279568df3c1029dcd1e09613e (diff) | |
Impl methods to detect BulkUpsert with DEFAULT columns and a flag to disable (#27596)
Co-authored-by: Copilot <[email protected]>
| -rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp | 107 | ||||
| -rw-r--r-- | ydb/core/protos/feature_flags.proto | 1 | ||||
| -rw-r--r-- | ydb/core/testlib/basics/feature_flags.h | 1 | ||||
| -rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 24 | ||||
| -rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_counters.cpp | 1 | ||||
| -rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_counters.h | 6 | 
6 files changed, 137 insertions, 3 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 3ee4d8d536a..9e0dd165cf0 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -782,12 +782,15 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {          TKikimrRunner kikimr(TKikimrSettings()              .SetUseRealThreads(false)              .SetEnableAddColumsWithDefaults(true) +            .SetDisableMissingDefaultColumnsInBulkUpsert(true)              .SetWithSampleTables(false));          auto db = kikimr.RunCall([&] { return kikimr.GetQueryClient(); } );          auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );          auto querySession = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } ); +        auto tableClient = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); +          auto& runtime = *kikimr.GetTestServer().GetRuntime();          { @@ -860,7 +863,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {          auto alterQuery = R"(              ALTER TABLE `/Root/AddNonColumnDoesnotReturnInternalError` -            ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7; +            ADD COLUMN Value3 Int32 DEFAULT 7;          )";          auto alterFuture = kikimr.RunInThreadPool([&] { return session.ExecuteQuery(alterQuery, TTxControl::NoTx()).GetValueSync(); }); @@ -895,6 +898,29 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {          }          { +            auto rowsBuilder = NYdb::TValueBuilder(); +            rowsBuilder.BeginList(); +            for (ui32 i = 10; i <= 15; ++i) { +                rowsBuilder.AddListItem() +                    .BeginStruct() +                    .AddMember("Key") +                        .Uint32(i) +                    .AddMember("Value") +                        .String("String") +                    .AddMember("Value2") +                        .String("String2") +                    .EndStruct(); + +            } +            rowsBuilder.EndList(); +            auto result = kikimr.RunCall([&] { +                return tableClient.BulkUpsert("/Root/AddNonColumnDoesnotReturnInternalError", rowsBuilder.Build()).GetValueSync(); +            }); +            UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString()); +            UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3"); +        } + +        {              TString result = fQuery(R"(                  UPSERT INTO `/Root/AddNonColumnDoesnotReturnInternalError` (Key, Value, Value2, Value3) VALUES (1, "4", "four", 1);              )"); @@ -933,8 +959,8 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {          auto result = runtime.WaitFuture(alterFuture);          fCompareTable(R"([ -            [1u;"Changed";"Updated";7]; -            [2u;"New";"text";7] +            [1u;"Changed";"Updated";[7]]; +            [2u;"New";"text";[7]]          ])");      } @@ -1513,6 +1539,81 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {          }      } +    Y_UNIT_TEST_TWIN(DefaultColumnAndBulkUpsert, DisableMissingDefaultColumnsInBulkUpsert) { +        TKikimrRunner kikimr(TKikimrSettings() +            .SetEnableAddColumsWithDefaults(true) +            .SetDisableMissingDefaultColumnsInBulkUpsert(DisableMissingDefaultColumnsInBulkUpsert) +            .SetWithSampleTables(false)); + +        auto queryClient = kikimr.GetQueryClient(); +        auto tableClient = kikimr.GetTableClient(); + +        { +            auto query = R"( +                CREATE TABLE `/Root/DefaultColumnAndBulkUpsert` ( +                    Key Uint32 NOT NULL, +                    Value1 String DEFAULT "Default value", +                    Value2 Int64 DEFAULT 123, +                    PRIMARY KEY (Key), +                ); +            )"; + +            auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); +            UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +        } + +        { +            auto query = R"( +                UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key) VALUES (1), (2); +            )"; + +            auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); +            UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +        } + +        { +            auto query = R"( +                UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key, Value1) VALUES (3, "Value1"), (4, "Value2"); +            )"; + +            auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); +            UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +        } + +        { +            auto query = R"( +                ALTER TABLE `/Root/DefaultColumnAndBulkUpsert` ADD COLUMN Value3 Utf8 DEFAULT "Value3"u; +            )"; + +            auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); +            UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); +        } + +        { +            auto rowsBuilder = NYdb::TValueBuilder(); +            rowsBuilder.BeginList(); +            for (ui32 i = 10; i <= 15; ++i) { +                rowsBuilder.AddListItem() +                    .BeginStruct() +                    .AddMember("Key") +                        .Uint32(i) +                    .AddMember("Value2") +                        .OptionalInt64(0) +                    .EndStruct(); + +            } +            rowsBuilder.EndList(); + +            auto result = tableClient.BulkUpsert("/Root/DefaultColumnAndBulkUpsert", rowsBuilder.Build()).ExtractValueSync(); +            if (DisableMissingDefaultColumnsInBulkUpsert) { +                UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString()); +                UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3, Value1"); +            } else { +                UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); +            } +        } +    } +      // Y_UNIT_TEST(SetNotNull) {      //     struct TValue {      //     private: diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index b86de316028..3b590034c4a 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -238,4 +238,5 @@ message TFeatureFlags {      optional bool EnableTopicMessageLevelParallelism = 212 [default = false];      optional bool EnableOlapRejectProbability = 213 [default = false];      optional bool EnablePDiskLogForSmallDisks = 214 [default = false]; +    optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];  } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index dde567ebab0..4fe1f6170b0 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -83,6 +83,7 @@ public:      FEATURE_FLAG_SETTER(EnableDataShardWriteAlwaysVolatile)      FEATURE_FLAG_SETTER(EnableStreamingQueries)      FEATURE_FLAG_SETTER(EnableSecureScriptExecutions) +    FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)      #undef FEATURE_FLAG_SETTER  }; diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 8688f9e52d8..d99f1dd916a 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -381,6 +381,7 @@ private:          THashMap<TString, ui32> columnByName;          THashSet<TString> keyColumnsLeft;          THashSet<TString> notNullColumnsLeft = entry.NotNullColumns; +        THashSet<TString> defaultColumnsLeft;          SrcColumns.reserve(entry.Columns.size());          THashSet<TString> HasInternalConversion; @@ -400,6 +401,10 @@ private:                  keyColumnIds[keyOrder] = id;                  keyColumnsLeft.insert(name);              } + +            if (colInfo.IsDefaultFromLiteral()) { +                defaultColumnsLeft.insert(name); +            }          }          if (entry.ColumnTableInfo) { @@ -498,6 +503,10 @@ private:                  NotNullColumns.emplace(ci.Name);              } +            if (defaultColumnsLeft.contains(ci.Name)) { +                defaultColumnsLeft.erase(ci.Name); +            } +              if (ci.KeyOrder != -1) {                  KeyColumnPositions[ci.KeyOrder] = TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, pgTypeMod, notNull};                  keyColumnsLeft.erase(ci.Name); @@ -582,6 +591,21 @@ private:              return TConclusionStatus::Fail(Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()));          } +        if (!defaultColumnsLeft.empty() && UpsertIfExists) { +            // some default columns are not specified in the request, but upsert will only update existing rows +            // and only the columns specified in the request will be updated; unspecified default columns will not be changed. +            defaultColumnsLeft.clear(); +        } + +        if (!defaultColumnsLeft.empty()) { +            if (AppData(ctx)->FeatureFlags.GetDisableMissingDefaultColumnsInBulkUpsert()) { +                return TConclusionStatus::Fail(Sprintf("Missing default columns: %s", JoinSeq(", ", defaultColumnsLeft).c_str())); +            } + +            UploadCounters.OnMissingDefaultColumns(); +            LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Missing default columns: " << JoinSeq(", ", defaultColumnsLeft).c_str()); +        } +          TConclusionStatus res = TConclusionStatus::Success();          if (isColumnTable && HasAppData() && AppDataVerified().ColumnShardConfig.GetBulkUpsertRequireAllColumns()) {              res = CheckRequiredColumns(entry, *reqColumns); diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.cpp b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp index 9e39f37298c..16fbd5ed4ac 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_counters.cpp +++ b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp @@ -99,5 +99,6 @@ TUploadCounters::TUploadCounters()              WrittenBytes = TBase::GetDeriviative("Replies/WrittenBytes");              FailedBytes = TBase::GetDeriviative("Replies/FailedBytes");              RequestsBytes = TBase::GetDeriviative("Requests/Bytes"); +            MissingDefaultColumnsCount = TBase::GetDeriviative("MissingDefaultColumns/Count");  }  } diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.h b/ydb/core/tx/tx_proxy/upload_rows_counters.h index b71e94b3ab2..76d6cea8513 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_counters.h +++ b/ydb/core/tx/tx_proxy/upload_rows_counters.h @@ -76,6 +76,8 @@ private:      NMonitoring::TDynamicCounters::TCounterPtr FailedBytes;      NMonitoring::TDynamicCounters::TCounterPtr RequestsBytes; +    NMonitoring::TDynamicCounters::TCounterPtr MissingDefaultColumnsCount; +      THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;      NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status); @@ -142,6 +144,10 @@ public:          PackageSizeCountByRecords->Collect(rowsCount);          RequestsBytes->Add(requestBytes);      } + +    void OnMissingDefaultColumns() { +        MissingDefaultColumnsCount->Inc(); +    }  };  }   // namespace NKikimr  | 
