summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Timizhev <[email protected]>2025-10-25 03:43:24 +0300
committerGitHub <[email protected]>2025-10-25 00:43:24 +0000
commitab5870365b8c9a298dbe9b0dc9a71035a25f3898 (patch)
tree00db4d69d171ae477cb718f7e37de63523543a05
parentcf7b87df54ae9bf279568df3c1029dcd1e09613e (diff)
Impl methods to detect BulkUpsert with DEFAULT columns and a flag to disable (#27596)
Co-authored-by: Copilot <[email protected]>
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp107
-rw-r--r--ydb/core/protos/feature_flags.proto1
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h24
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_counters.cpp1
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_counters.h6
6 files changed, 137 insertions, 3 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
index 3ee4d8d536a..9e0dd165cf0 100644
--- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
@@ -782,12 +782,15 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
TKikimrRunner kikimr(TKikimrSettings()
.SetUseRealThreads(false)
.SetEnableAddColumsWithDefaults(true)
+ .SetDisableMissingDefaultColumnsInBulkUpsert(true)
.SetWithSampleTables(false));
auto db = kikimr.RunCall([&] { return kikimr.GetQueryClient(); } );
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
auto querySession = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
+ auto tableClient = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
+
auto& runtime = *kikimr.GetTestServer().GetRuntime();
{
@@ -860,7 +863,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
auto alterQuery = R"(
ALTER TABLE `/Root/AddNonColumnDoesnotReturnInternalError`
- ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7;
+ ADD COLUMN Value3 Int32 DEFAULT 7;
)";
auto alterFuture = kikimr.RunInThreadPool([&] { return session.ExecuteQuery(alterQuery, TTxControl::NoTx()).GetValueSync(); });
@@ -895,6 +898,29 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
}
{
+ auto rowsBuilder = NYdb::TValueBuilder();
+ rowsBuilder.BeginList();
+ for (ui32 i = 10; i <= 15; ++i) {
+ rowsBuilder.AddListItem()
+ .BeginStruct()
+ .AddMember("Key")
+ .Uint32(i)
+ .AddMember("Value")
+ .String("String")
+ .AddMember("Value2")
+ .String("String2")
+ .EndStruct();
+
+ }
+ rowsBuilder.EndList();
+ auto result = kikimr.RunCall([&] {
+ return tableClient.BulkUpsert("/Root/AddNonColumnDoesnotReturnInternalError", rowsBuilder.Build()).GetValueSync();
+ });
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3");
+ }
+
+ {
TString result = fQuery(R"(
UPSERT INTO `/Root/AddNonColumnDoesnotReturnInternalError` (Key, Value, Value2, Value3) VALUES (1, "4", "four", 1);
)");
@@ -933,8 +959,8 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
auto result = runtime.WaitFuture(alterFuture);
fCompareTable(R"([
- [1u;"Changed";"Updated";7];
- [2u;"New";"text";7]
+ [1u;"Changed";"Updated";[7]];
+ [2u;"New";"text";[7]]
])");
}
@@ -1513,6 +1539,81 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
}
}
+ Y_UNIT_TEST_TWIN(DefaultColumnAndBulkUpsert, DisableMissingDefaultColumnsInBulkUpsert) {
+ TKikimrRunner kikimr(TKikimrSettings()
+ .SetEnableAddColumsWithDefaults(true)
+ .SetDisableMissingDefaultColumnsInBulkUpsert(DisableMissingDefaultColumnsInBulkUpsert)
+ .SetWithSampleTables(false));
+
+ auto queryClient = kikimr.GetQueryClient();
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ auto query = R"(
+ CREATE TABLE `/Root/DefaultColumnAndBulkUpsert` (
+ Key Uint32 NOT NULL,
+ Value1 String DEFAULT "Default value",
+ Value2 Int64 DEFAULT 123,
+ PRIMARY KEY (Key),
+ );
+ )";
+
+ auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key) VALUES (1), (2);
+ )";
+
+ auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key, Value1) VALUES (3, "Value1"), (4, "Value2");
+ )";
+
+ auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ auto query = R"(
+ ALTER TABLE `/Root/DefaultColumnAndBulkUpsert` ADD COLUMN Value3 Utf8 DEFAULT "Value3"u;
+ )";
+
+ auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ auto rowsBuilder = NYdb::TValueBuilder();
+ rowsBuilder.BeginList();
+ for (ui32 i = 10; i <= 15; ++i) {
+ rowsBuilder.AddListItem()
+ .BeginStruct()
+ .AddMember("Key")
+ .Uint32(i)
+ .AddMember("Value2")
+ .OptionalInt64(0)
+ .EndStruct();
+
+ }
+ rowsBuilder.EndList();
+
+ auto result = tableClient.BulkUpsert("/Root/DefaultColumnAndBulkUpsert", rowsBuilder.Build()).ExtractValueSync();
+ if (DisableMissingDefaultColumnsInBulkUpsert) {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString());
+ UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3, Value1");
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+ }
+
// Y_UNIT_TEST(SetNotNull) {
// struct TValue {
// private:
diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto
index b86de316028..3b590034c4a 100644
--- a/ydb/core/protos/feature_flags.proto
+++ b/ydb/core/protos/feature_flags.proto
@@ -238,4 +238,5 @@ message TFeatureFlags {
optional bool EnableTopicMessageLevelParallelism = 212 [default = false];
optional bool EnableOlapRejectProbability = 213 [default = false];
optional bool EnablePDiskLogForSmallDisks = 214 [default = false];
+ optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
}
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index dde567ebab0..4fe1f6170b0 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -83,6 +83,7 @@ public:
FEATURE_FLAG_SETTER(EnableDataShardWriteAlwaysVolatile)
FEATURE_FLAG_SETTER(EnableStreamingQueries)
FEATURE_FLAG_SETTER(EnableSecureScriptExecutions)
+ FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)
#undef FEATURE_FLAG_SETTER
};
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 8688f9e52d8..d99f1dd916a 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -381,6 +381,7 @@ private:
THashMap<TString, ui32> columnByName;
THashSet<TString> keyColumnsLeft;
THashSet<TString> notNullColumnsLeft = entry.NotNullColumns;
+ THashSet<TString> defaultColumnsLeft;
SrcColumns.reserve(entry.Columns.size());
THashSet<TString> HasInternalConversion;
@@ -400,6 +401,10 @@ private:
keyColumnIds[keyOrder] = id;
keyColumnsLeft.insert(name);
}
+
+ if (colInfo.IsDefaultFromLiteral()) {
+ defaultColumnsLeft.insert(name);
+ }
}
if (entry.ColumnTableInfo) {
@@ -498,6 +503,10 @@ private:
NotNullColumns.emplace(ci.Name);
}
+ if (defaultColumnsLeft.contains(ci.Name)) {
+ defaultColumnsLeft.erase(ci.Name);
+ }
+
if (ci.KeyOrder != -1) {
KeyColumnPositions[ci.KeyOrder] = TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, pgTypeMod, notNull};
keyColumnsLeft.erase(ci.Name);
@@ -582,6 +591,21 @@ private:
return TConclusionStatus::Fail(Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()));
}
+ if (!defaultColumnsLeft.empty() && UpsertIfExists) {
+ // some default columns are not specified in the request, but upsert will only update existing rows
+ // and only the columns specified in the request will be updated; unspecified default columns will not be changed.
+ defaultColumnsLeft.clear();
+ }
+
+ if (!defaultColumnsLeft.empty()) {
+ if (AppData(ctx)->FeatureFlags.GetDisableMissingDefaultColumnsInBulkUpsert()) {
+ return TConclusionStatus::Fail(Sprintf("Missing default columns: %s", JoinSeq(", ", defaultColumnsLeft).c_str()));
+ }
+
+ UploadCounters.OnMissingDefaultColumns();
+ LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Missing default columns: " << JoinSeq(", ", defaultColumnsLeft).c_str());
+ }
+
TConclusionStatus res = TConclusionStatus::Success();
if (isColumnTable && HasAppData() && AppDataVerified().ColumnShardConfig.GetBulkUpsertRequireAllColumns()) {
res = CheckRequiredColumns(entry, *reqColumns);
diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.cpp b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp
index 9e39f37298c..16fbd5ed4ac 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_counters.cpp
+++ b/ydb/core/tx/tx_proxy/upload_rows_counters.cpp
@@ -99,5 +99,6 @@ TUploadCounters::TUploadCounters()
WrittenBytes = TBase::GetDeriviative("Replies/WrittenBytes");
FailedBytes = TBase::GetDeriviative("Replies/FailedBytes");
RequestsBytes = TBase::GetDeriviative("Requests/Bytes");
+ MissingDefaultColumnsCount = TBase::GetDeriviative("MissingDefaultColumns/Count");
}
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_counters.h b/ydb/core/tx/tx_proxy/upload_rows_counters.h
index b71e94b3ab2..76d6cea8513 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_counters.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_counters.h
@@ -76,6 +76,8 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr FailedBytes;
NMonitoring::TDynamicCounters::TCounterPtr RequestsBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr MissingDefaultColumnsCount;
+
THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;
NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status);
@@ -142,6 +144,10 @@ public:
PackageSizeCountByRecords->Collect(rowsCount);
RequestsBytes->Add(requestBytes);
}
+
+ void OnMissingDefaultColumns() {
+ MissingDefaultColumnsCount->Inc();
+ }
};
} // namespace NKikimr