diff options
author | cherepashka <cherepashka@yandex-team.com> | 2024-09-09 16:05:38 +0300 |
---|---|---|
committer | cherepashka <cherepashka@yandex-team.com> | 2024-09-09 16:21:30 +0300 |
commit | a995a7ffdddcde029c84fc4beec78263605a1472 (patch) | |
tree | 28eb8701ce93ca5967d2ce3568fae892f7782751 | |
parent | 2d88b750a431123deaef5c629537bb40087c4077 (diff) | |
download | ydb-a995a7ffdddcde029c84fc4beec78263605a1472.tar.gz |
YT-20696: Non materialized computed columns in schema
Added `materialized` parameter into column schema & unittest
Evaluations of non-materialized computed columns will be in next PR: <https://a.yandex-team.ru/review/5440960/details>
f6ff92e3ffc403ad829e546203513b6b52840871
-rw-r--r-- | yt/yt/client/api/rpc_proxy/helpers.cpp | 18 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/table_mount_cache.cpp | 3 | ||||
-rw-r--r-- | yt/yt/client/table_client/check_schema_compatibility.cpp | 9 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 58 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.h | 7 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema_serialization_helpers.cpp | 5 | ||||
-rw-r--r-- | yt/yt/core/misc/protobuf_helpers.h | 4 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto | 1 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto | 1 |
9 files changed, 87 insertions, 19 deletions
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp index 9ebde85ea7..a89368573c 100644 --- a/yt/yt/client/api/rpc_proxy/helpers.cpp +++ b/yt/yt/client/api/rpc_proxy/helpers.cpp @@ -447,6 +447,11 @@ void ToProto(NProto::TColumnSchema* protoSchema, const NTableClient::TColumnSche } else { protoSchema->clear_expression(); } + if (schema.Materialized()) { + protoSchema->set_materialized(*schema.Materialized()); + } else { + protoSchema->clear_materialized(); + } if (schema.Aggregate()) { protoSchema->set_aggregate(*schema.Aggregate()); } else { @@ -520,12 +525,13 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema& } schema->SetLogicalType(std::move(columnType)); - schema->SetLock(protoSchema.has_lock() ? std::make_optional(protoSchema.lock()) : std::nullopt); - schema->SetExpression(protoSchema.has_expression() ? std::make_optional(protoSchema.expression()) : std::nullopt); - schema->SetAggregate(protoSchema.has_aggregate() ? std::make_optional(protoSchema.aggregate()) : std::nullopt); - schema->SetSortOrder(protoSchema.has_sort_order() ? std::make_optional(ESortOrder(protoSchema.sort_order())) : std::nullopt); - schema->SetGroup(protoSchema.has_group() ? std::make_optional(protoSchema.group()) : std::nullopt); - schema->SetMaxInlineHunkSize(protoSchema.has_max_inline_hunk_size() ? std::make_optional(protoSchema.max_inline_hunk_size()) : std::nullopt); + schema->SetLock(YT_PROTO_OPTIONAL(protoSchema, lock)); + schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression)); + schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized)); + schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate)); + schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>)); + schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group)); + schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size)); } void ToProto(NProto::TTableSchema* protoSchema, const NTableClient::TTableSchema& schema) diff --git a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp index 9eed25eb95..392312d511 100644 --- a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp +++ b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp @@ -66,7 +66,8 @@ private: tableInfo->UpstreamReplicaId = FromProto<TTableReplicaId>(rsp->upstream_replica_id()); tableInfo->Dynamic = rsp->dynamic(); - tableInfo->NeedKeyEvaluation = primarySchema->HasComputedColumns(); + // Non-materialized computed columns are always non-key columns. + tableInfo->NeedKeyEvaluation = primarySchema->HasMaterializedComputedColumns(); if (rsp->has_physical_path()) { tableInfo->PhysicalPath = rsp->physical_path(); diff --git a/yt/yt/client/table_client/check_schema_compatibility.cpp b/yt/yt/client/table_client/check_schema_compatibility.cpp index b1e5d34bae..a2297bf05d 100644 --- a/yt/yt/client/table_client/check_schema_compatibility.cpp +++ b/yt/yt/client/table_client/check_schema_compatibility.cpp @@ -85,6 +85,15 @@ std::pair<ESchemaCompatibility, TError> CheckTableSchemaCompatibilityImpl( inputColumn->GetDiagnosticNameString()), }; } + + if (outputColumn.Materialized().value_or(true) != inputColumn->Materialized().value_or(true)) { + return { + ESchemaCompatibility::Incompatible, + TError("Column %v materialization mismatch", + inputColumn->GetDiagnosticNameString()), + }; + } + if (outputColumn.Aggregate() && inputColumn->Aggregate() != outputColumn.Aggregate()) { return { ESchemaCompatibility::Incompatible, diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index 70b210966c..ac530ed758 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -221,6 +221,12 @@ TColumnSchema& TColumnSchema::SetExpression(std::optional<TString> value) return *this; } +TColumnSchema& TColumnSchema::SetMaterialized(std::optional<bool> value) +{ + Materialized_ = std::move(value); + return *this; +} + TColumnSchema& TColumnSchema::SetAggregate(std::optional<TString> value) { Aggregate_ = std::move(value); @@ -349,6 +355,10 @@ void FormatValue(TStringBuilderBase* builder, const TColumnSchema& schema, TStri builder->AppendFormat("; expression=%Qv", *expression); } + if (const auto& materialized = schema.Materialized()) { + builder->AppendFormat("; materialized=%Qv", *materialized); + } + if (const auto& aggregate = schema.Aggregate()) { builder->AppendFormat("; aggregate=%v", *aggregate); } @@ -408,6 +418,11 @@ void ToProto(NProto::TColumnSchema* protoSchema, const TColumnSchema& schema) } else { protoSchema->clear_expression(); } + if (schema.Materialized()) { + protoSchema->set_materialized(*schema.Materialized()); + } else { + protoSchema->clear_materialized(); + } if (schema.Aggregate()) { protoSchema->set_aggregate(*schema.Aggregate()); } else { @@ -456,12 +471,13 @@ void FromProto(TColumnSchema* schema, const NProto::TColumnSchema& protoSchema) schema->SetLogicalType(MakeLogicalType(GetLogicalType(physicalType), protoSchema.required())); } - schema->SetLock(protoSchema.has_lock() ? std::make_optional(protoSchema.lock()) : std::nullopt); - schema->SetExpression(protoSchema.has_expression() ? std::make_optional(protoSchema.expression()) : std::nullopt); - schema->SetAggregate(protoSchema.has_aggregate() ? std::make_optional(protoSchema.aggregate()) : std::nullopt); - schema->SetSortOrder(protoSchema.has_sort_order() ? std::make_optional(CheckedEnumCast<ESortOrder>(protoSchema.sort_order())) : std::nullopt); - schema->SetGroup(protoSchema.has_group() ? std::make_optional(protoSchema.group()) : std::nullopt); - schema->SetMaxInlineHunkSize(protoSchema.has_max_inline_hunk_size() ? std::make_optional(protoSchema.max_inline_hunk_size()) : std::nullopt); + schema->SetLock(YT_PROTO_OPTIONAL(protoSchema, lock)); + schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression)); + schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized)); + schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate)); + schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>)); + schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group)); + schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size)); } void FromProto(TDeletedColumn* schema, const NProto::TDeletedColumn& protoSchema) @@ -549,7 +565,11 @@ TTableSchema::TTableSchema( ++KeyColumnCount_; } if (column.Expression()) { - HasComputedColumns_ = true; + if (column.Materialized().value_or(true)) { + HasMaterializedComputedColumns_ = true; + } else { + HasNonMaterializedComputedColumns_ = true; + } } if (column.Aggregate()) { HasAggregateColumns_ = true; @@ -702,9 +722,19 @@ TTableSchemaPtr TTableSchema::Filter(const std::optional<std::vector<TString>>& return Filter(THashSet<TString>(columnNames->begin(), columnNames->end()), discardSortOrder); } +bool TTableSchema::HasMaterializedComputedColumns() const +{ + return HasMaterializedComputedColumns_; +} + +bool TTableSchema::HasNonMaterializedComputedColumns() const +{ + return HasNonMaterializedComputedColumns_; +} + bool TTableSchema::HasComputedColumns() const { - return HasComputedColumns_; + return HasMaterializedComputedColumns() || HasNonMaterializedComputedColumns(); } bool TTableSchema::HasAggregateColumns() const @@ -1551,6 +1581,7 @@ bool operator==(const TColumnSchema& lhs, const TColumnSchema& rhs) lhs.SortOrder() == rhs.SortOrder() && lhs.Lock() == rhs.Lock() && lhs.Expression() == rhs.Expression() && + lhs.Materialized() == rhs.Materialized() && lhs.Aggregate() == rhs.Aggregate() && lhs.Group() == rhs.Group() && lhs.MaxInlineHunkSize() == rhs.MaxInlineHunkSize(); @@ -1902,9 +1933,13 @@ void ValidateColumnSchema( } ValidateSchemaValueType(columnSchema.GetWireType()); - - if (columnSchema.Expression() && !columnSchema.SortOrder() && isTableDynamic) { - THROW_ERROR_EXCEPTION("Non-key column cannot be computed"); + if (columnSchema.Expression()) { + auto materialized = columnSchema.Materialized().value_or(true); + if (materialized && !columnSchema.SortOrder() && isTableDynamic) { + THROW_ERROR_EXCEPTION("Non-key column cannot be computed in materializable way"); + } else if (!materialized && columnSchema.SortOrder()) { + THROW_ERROR_EXCEPTION("Key column cannot be computed in non-materializable way"); + } } if (columnSchema.Aggregate() && columnSchema.SortOrder()) { @@ -2418,6 +2453,7 @@ size_t THash<NYT::NTableClient::TColumnSchema>::operator()(const NYT::NTableClie columnSchema.SortOrder(), columnSchema.Lock(), columnSchema.Expression(), + columnSchema.Materialized(), columnSchema.Aggregate(), columnSchema.Group(), columnSchema.MaxInlineHunkSize()); diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h index 149d0c1a04..4fd3c3f486 100644 --- a/yt/yt/client/table_client/schema.h +++ b/yt/yt/client/table_client/schema.h @@ -122,6 +122,7 @@ public: DEFINE_BYREF_RO_PROPERTY(std::optional<ESortOrder>, SortOrder); DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Lock); DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Expression); + DEFINE_BYREF_RO_PROPERTY(std::optional<bool>, Materialized); DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Aggregate); DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Group); DEFINE_BYREF_RO_PROPERTY(bool, Required); @@ -156,6 +157,7 @@ public: TColumnSchema& SetSortOrder(std::optional<ESortOrder> value); TColumnSchema& SetLock(std::optional<TString> value); TColumnSchema& SetExpression(std::optional<TString> value); + TColumnSchema& SetMaterialized(std::optional<bool> value); TColumnSchema& SetAggregate(std::optional<TString> value); TColumnSchema& SetGroup(std::optional<TString> value); TColumnSchema& SetRequired(bool value); @@ -277,6 +279,8 @@ public: const std::optional<std::vector<TString>>& columnNames, bool discardSortOrder = false) const; + bool HasMaterializedComputedColumns() const; + bool HasNonMaterializedComputedColumns() const; bool HasComputedColumns() const; bool HasAggregateColumns() const; bool HasHunkColumns() const; @@ -410,7 +414,8 @@ private: std::shared_ptr<const TColumnInfo> ColumnInfo_; int KeyColumnCount_ = 0; - bool HasComputedColumns_ = false; + bool HasMaterializedComputedColumns_ = false; + bool HasNonMaterializedComputedColumns_ = false; bool HasAggregateColumns_ = false; THunkColumnIds HunkColumnsIds_; diff --git a/yt/yt/client/table_client/schema_serialization_helpers.cpp b/yt/yt/client/table_client/schema_serialization_helpers.cpp index 2190137b08..4f344101d8 100644 --- a/yt/yt/client/table_client/schema_serialization_helpers.cpp +++ b/yt/yt/client/table_client/schema_serialization_helpers.cpp @@ -42,6 +42,8 @@ void TSerializableColumnSchema::Register(TRegistrar registrar) .Default(); registrar.BaseClassParameter("expression", &TThis::Expression_) .Default(); + registrar.BaseClassParameter("materialized", &TThis::Materialized_) + .Default(); registrar.BaseClassParameter("aggregate", &TThis::Aggregate_) .Default(); registrar.BaseClassParameter("sort_order", &TThis::SortOrder_) @@ -81,6 +83,9 @@ void TSerializableColumnSchema::DeserializeFromCursor(NYson::TYsonPullParserCurs } else if (key == TStringBuf("expression")) { cursor->Next(); SetExpression(ExtractTo<std::optional<TString>>(cursor)); + } else if (key == TStringBuf("materialized")) { + cursor->Next(); + SetMaterialized(ExtractTo<std::optional<bool>>(cursor)); } else if (key == TStringBuf("aggregate")) { cursor->Next(); SetAggregate(ExtractTo<std::optional<TString>>(cursor)); diff --git a/yt/yt/core/misc/protobuf_helpers.h b/yt/yt/core/misc/protobuf_helpers.h index c20ace8017..d9ec6b1bc0 100644 --- a/yt/yt/core/misc/protobuf_helpers.h +++ b/yt/yt/core/misc/protobuf_helpers.h @@ -395,6 +395,10 @@ google::protobuf::Timestamp GetProtoNow(); ? std::optional(YT_PROTO_OPTIONAL_CONVERT(__VA_ARGS__)((message).field())) \ : std::nullopt) +// TODO(cherepashka): to remove after std::optional::and_then is here. +//! This macro may be used to extract std::optional<T> from protobuf message field of type T and to apply some function to value if it is present. +#define YT_APPLY_PROTO_OPTIONAL(message, field, function) (((message).has_##field()) ? std::make_optional(function((message).field())) : std::nullopt) + //////////////////////////////////////////////////////////////////////////////// // TODO(gritukan): This is a hack that allows to use proper string type in the protobuf-related code. diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto index 607dcdb06a..f77ee77747 100644 --- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto +++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto @@ -1398,6 +1398,7 @@ message TColumnSchema optional string lock = 4; optional string expression = 5; + optional bool materialized = 13; optional string aggregate = 6; optional int32 sort_order = 7; optional string group = 8; diff --git a/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto b/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto index 7ca77a0622..d41156814a 100644 --- a/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto +++ b/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto @@ -86,6 +86,7 @@ message TColumnSchema optional string lock = 3; optional string expression = 4; + optional bool materialized = 13; optional string aggregate = 5; optional int32 sort_order = 6; optional string group = 7; |