aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-08-15 18:06:20 +0300
committerGitHub <noreply@github.com>2024-08-15 18:06:20 +0300
commit1159d25da45667e86d1bb091c80a3ccff9dc096b (patch)
tree5858225e7b95a37132a23e6c1a324ff8f53fc0cc
parente050f63a7554f983109111def4df805264624dac (diff)
downloadydb-1159d25da45667e86d1bb091c80a3ccff9dc096b.tar.gz
Optional columns feature flags (#7814)
-rw-r--r--ydb/core/formats/arrow/modifier/subset.h6
-rw-r--r--ydb/core/protos/feature_flags.proto1
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp7
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder/builder.cpp9
4 files changed, 19 insertions, 4 deletions
diff --git a/ydb/core/formats/arrow/modifier/subset.h b/ydb/core/formats/arrow/modifier/subset.h
index a638d8f1734..fc15d44e4fb 100644
--- a/ydb/core/formats/arrow/modifier/subset.h
+++ b/ydb/core/formats/arrow/modifier/subset.h
@@ -14,6 +14,12 @@ public:
TSchemaSubset() = default;
TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount);
+ static TSchemaSubset AllFieldsAccepted() {
+ TSchemaSubset result;
+ result.Exclude = true;
+ return result;
+ }
+
template <class T>
std::vector<T> Apply(const std::vector<T>& fullSchema) const {
if (FieldIdx.empty()) {
diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto
index 23b50af10a4..0d92dc33076 100644
--- a/ydb/core/protos/feature_flags.proto
+++ b/ydb/core/protos/feature_flags.proto
@@ -152,4 +152,5 @@ message TFeatureFlags {
optional bool EnableVectorIndex = 133 [default = false];
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
optional bool EnableResourcePoolsCounters = 135 [default = false];
+ optional bool EnableOptionalColumnsInColumnShard = 136 [default = false];
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp
index 68dee9a3eb8..47a1bd12086 100644
--- a/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.cpp
@@ -53,7 +53,8 @@ TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> ISnapshotSchema::Normali
}
}
if (restoreColumnIds.contains(columnId)) {
- AFL_VERIFY(!!GetExternalDefaultValueVerified(columnId) || GetIndexInfo().IsNullableVerified(columnId));
+ AFL_VERIFY(!!GetExternalDefaultValueVerified(columnId) || GetIndexInfo().IsNullableVerified(columnId))("column_name",
+ GetIndexInfo().GetColumnName(columnId, false))("id", columnId);
result->AddField(resultField, GetColumnLoaderVerified(columnId)->BuildDefaultAccessor(batch->num_rows())).Validate();
}
}
@@ -114,8 +115,9 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, GetIndexInfo().GetPrimaryKey()));
switch (mType) {
+ case NEvWrite::EModificationType::Replace:
case NEvWrite::EModificationType::Upsert: {
- AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields());
+ AFL_VERIFY(batch->num_columns() <= dstSchema->num_fields());
if (batch->num_columns() < dstSchema->num_fields()) {
for (auto&& f : dstSchema->fields()) {
if (GetIndexInfo().IsNullableVerified(f->name())) {
@@ -132,7 +134,6 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> ISnapshotSchema::PrepareForModi
return batch;
}
case NEvWrite::EModificationType::Delete:
- case NEvWrite::EModificationType::Replace:
case NEvWrite::EModificationType::Insert:
case NEvWrite::EModificationType::Update:
return batch;
diff --git a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
index d195c505647..a5daa4f5d28 100644
--- a/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
+++ b/ydb/core/tx/columnshard/operations/slice_builder/builder.cpp
@@ -55,7 +55,14 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
if (OriginalBatch->num_columns() != indexSchema->num_fields()) {
AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())(
"index", indexSchema->num_fields());
-
+ if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard()) {
+ subset = NArrow::TSchemaSubset::AllFieldsAccepted();
+ const std::vector<ui32>& columnIdsVector = ActualSchema->GetIndexInfo().GetColumnIds(false);
+ const std::set<ui32> columnIdsSet(columnIdsVector.begin(), columnIdsVector.end());
+ auto normalized =
+ ActualSchema->NormalizeBatch(*ActualSchema, std::make_shared<NArrow::TGeneralContainer>(OriginalBatch), columnIdsSet).DetachResult();
+ OriginalBatch = NArrow::ToBatch(normalized->BuildTableVerified(), true);
+ }
}
WriteData.MutableWriteMeta().SetWriteMiddle2StartInstant(TMonotonic::Now());
auto batches = BuildSlices();