aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcherepashka <cherepashka@yandex-team.com>2024-09-09 16:05:38 +0300
committercherepashka <cherepashka@yandex-team.com>2024-09-09 16:21:30 +0300
commita995a7ffdddcde029c84fc4beec78263605a1472 (patch)
tree28eb8701ce93ca5967d2ce3568fae892f7782751
parent2d88b750a431123deaef5c629537bb40087c4077 (diff)
downloadydb-a995a7ffdddcde029c84fc4beec78263605a1472.tar.gz
YT-20696: Non materialized computed columns in schema
Added `materialized` parameter into column schema & unittest Evaluations of non-materialized computed columns will be in next PR: <https://a.yandex-team.ru/review/5440960/details> f6ff92e3ffc403ad829e546203513b6b52840871
-rw-r--r--yt/yt/client/api/rpc_proxy/helpers.cpp18
-rw-r--r--yt/yt/client/api/rpc_proxy/table_mount_cache.cpp3
-rw-r--r--yt/yt/client/table_client/check_schema_compatibility.cpp9
-rw-r--r--yt/yt/client/table_client/schema.cpp58
-rw-r--r--yt/yt/client/table_client/schema.h7
-rw-r--r--yt/yt/client/table_client/schema_serialization_helpers.cpp5
-rw-r--r--yt/yt/core/misc/protobuf_helpers.h4
-rw-r--r--yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto1
-rw-r--r--yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto1
9 files changed, 87 insertions, 19 deletions
diff --git a/yt/yt/client/api/rpc_proxy/helpers.cpp b/yt/yt/client/api/rpc_proxy/helpers.cpp
index 9ebde85ea7..a89368573c 100644
--- a/yt/yt/client/api/rpc_proxy/helpers.cpp
+++ b/yt/yt/client/api/rpc_proxy/helpers.cpp
@@ -447,6 +447,11 @@ void ToProto(NProto::TColumnSchema* protoSchema, const NTableClient::TColumnSche
} else {
protoSchema->clear_expression();
}
+ if (schema.Materialized()) {
+ protoSchema->set_materialized(*schema.Materialized());
+ } else {
+ protoSchema->clear_materialized();
+ }
if (schema.Aggregate()) {
protoSchema->set_aggregate(*schema.Aggregate());
} else {
@@ -520,12 +525,13 @@ void FromProto(NTableClient::TColumnSchema* schema, const NProto::TColumnSchema&
}
schema->SetLogicalType(std::move(columnType));
- schema->SetLock(protoSchema.has_lock() ? std::make_optional(protoSchema.lock()) : std::nullopt);
- schema->SetExpression(protoSchema.has_expression() ? std::make_optional(protoSchema.expression()) : std::nullopt);
- schema->SetAggregate(protoSchema.has_aggregate() ? std::make_optional(protoSchema.aggregate()) : std::nullopt);
- schema->SetSortOrder(protoSchema.has_sort_order() ? std::make_optional(ESortOrder(protoSchema.sort_order())) : std::nullopt);
- schema->SetGroup(protoSchema.has_group() ? std::make_optional(protoSchema.group()) : std::nullopt);
- schema->SetMaxInlineHunkSize(protoSchema.has_max_inline_hunk_size() ? std::make_optional(protoSchema.max_inline_hunk_size()) : std::nullopt);
+ schema->SetLock(YT_PROTO_OPTIONAL(protoSchema, lock));
+ schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression));
+ schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized));
+ schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate));
+ schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>));
+ schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group));
+ schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size));
}
void ToProto(NProto::TTableSchema* protoSchema, const NTableClient::TTableSchema& schema)
diff --git a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
index 9eed25eb95..392312d511 100644
--- a/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
+++ b/yt/yt/client/api/rpc_proxy/table_mount_cache.cpp
@@ -66,7 +66,8 @@ private:
tableInfo->UpstreamReplicaId = FromProto<TTableReplicaId>(rsp->upstream_replica_id());
tableInfo->Dynamic = rsp->dynamic();
- tableInfo->NeedKeyEvaluation = primarySchema->HasComputedColumns();
+ // Non-materialized computed columns are always non-key columns.
+ tableInfo->NeedKeyEvaluation = primarySchema->HasMaterializedComputedColumns();
if (rsp->has_physical_path()) {
tableInfo->PhysicalPath = rsp->physical_path();
diff --git a/yt/yt/client/table_client/check_schema_compatibility.cpp b/yt/yt/client/table_client/check_schema_compatibility.cpp
index b1e5d34bae..a2297bf05d 100644
--- a/yt/yt/client/table_client/check_schema_compatibility.cpp
+++ b/yt/yt/client/table_client/check_schema_compatibility.cpp
@@ -85,6 +85,15 @@ std::pair<ESchemaCompatibility, TError> CheckTableSchemaCompatibilityImpl(
inputColumn->GetDiagnosticNameString()),
};
}
+
+ if (outputColumn.Materialized().value_or(true) != inputColumn->Materialized().value_or(true)) {
+ return {
+ ESchemaCompatibility::Incompatible,
+ TError("Column %v materialization mismatch",
+ inputColumn->GetDiagnosticNameString()),
+ };
+ }
+
if (outputColumn.Aggregate() && inputColumn->Aggregate() != outputColumn.Aggregate()) {
return {
ESchemaCompatibility::Incompatible,
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index 70b210966c..ac530ed758 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -221,6 +221,12 @@ TColumnSchema& TColumnSchema::SetExpression(std::optional<TString> value)
return *this;
}
+TColumnSchema& TColumnSchema::SetMaterialized(std::optional<bool> value)
+{
+ Materialized_ = std::move(value);
+ return *this;
+}
+
TColumnSchema& TColumnSchema::SetAggregate(std::optional<TString> value)
{
Aggregate_ = std::move(value);
@@ -349,6 +355,10 @@ void FormatValue(TStringBuilderBase* builder, const TColumnSchema& schema, TStri
builder->AppendFormat("; expression=%Qv", *expression);
}
+ if (const auto& materialized = schema.Materialized()) {
+ builder->AppendFormat("; materialized=%Qv", *materialized);
+ }
+
if (const auto& aggregate = schema.Aggregate()) {
builder->AppendFormat("; aggregate=%v", *aggregate);
}
@@ -408,6 +418,11 @@ void ToProto(NProto::TColumnSchema* protoSchema, const TColumnSchema& schema)
} else {
protoSchema->clear_expression();
}
+ if (schema.Materialized()) {
+ protoSchema->set_materialized(*schema.Materialized());
+ } else {
+ protoSchema->clear_materialized();
+ }
if (schema.Aggregate()) {
protoSchema->set_aggregate(*schema.Aggregate());
} else {
@@ -456,12 +471,13 @@ void FromProto(TColumnSchema* schema, const NProto::TColumnSchema& protoSchema)
schema->SetLogicalType(MakeLogicalType(GetLogicalType(physicalType), protoSchema.required()));
}
- schema->SetLock(protoSchema.has_lock() ? std::make_optional(protoSchema.lock()) : std::nullopt);
- schema->SetExpression(protoSchema.has_expression() ? std::make_optional(protoSchema.expression()) : std::nullopt);
- schema->SetAggregate(protoSchema.has_aggregate() ? std::make_optional(protoSchema.aggregate()) : std::nullopt);
- schema->SetSortOrder(protoSchema.has_sort_order() ? std::make_optional(CheckedEnumCast<ESortOrder>(protoSchema.sort_order())) : std::nullopt);
- schema->SetGroup(protoSchema.has_group() ? std::make_optional(protoSchema.group()) : std::nullopt);
- schema->SetMaxInlineHunkSize(protoSchema.has_max_inline_hunk_size() ? std::make_optional(protoSchema.max_inline_hunk_size()) : std::nullopt);
+ schema->SetLock(YT_PROTO_OPTIONAL(protoSchema, lock));
+ schema->SetExpression(YT_PROTO_OPTIONAL(protoSchema, expression));
+ schema->SetMaterialized(YT_PROTO_OPTIONAL(protoSchema, materialized));
+ schema->SetAggregate(YT_PROTO_OPTIONAL(protoSchema, aggregate));
+ schema->SetSortOrder(YT_APPLY_PROTO_OPTIONAL(protoSchema, sort_order, CheckedEnumCast<ESortOrder>));
+ schema->SetGroup(YT_PROTO_OPTIONAL(protoSchema, group));
+ schema->SetMaxInlineHunkSize(YT_PROTO_OPTIONAL(protoSchema, max_inline_hunk_size));
}
void FromProto(TDeletedColumn* schema, const NProto::TDeletedColumn& protoSchema)
@@ -549,7 +565,11 @@ TTableSchema::TTableSchema(
++KeyColumnCount_;
}
if (column.Expression()) {
- HasComputedColumns_ = true;
+ if (column.Materialized().value_or(true)) {
+ HasMaterializedComputedColumns_ = true;
+ } else {
+ HasNonMaterializedComputedColumns_ = true;
+ }
}
if (column.Aggregate()) {
HasAggregateColumns_ = true;
@@ -702,9 +722,19 @@ TTableSchemaPtr TTableSchema::Filter(const std::optional<std::vector<TString>>&
return Filter(THashSet<TString>(columnNames->begin(), columnNames->end()), discardSortOrder);
}
+bool TTableSchema::HasMaterializedComputedColumns() const
+{
+ return HasMaterializedComputedColumns_;
+}
+
+bool TTableSchema::HasNonMaterializedComputedColumns() const
+{
+ return HasNonMaterializedComputedColumns_;
+}
+
bool TTableSchema::HasComputedColumns() const
{
- return HasComputedColumns_;
+ return HasMaterializedComputedColumns() || HasNonMaterializedComputedColumns();
}
bool TTableSchema::HasAggregateColumns() const
@@ -1551,6 +1581,7 @@ bool operator==(const TColumnSchema& lhs, const TColumnSchema& rhs)
lhs.SortOrder() == rhs.SortOrder() &&
lhs.Lock() == rhs.Lock() &&
lhs.Expression() == rhs.Expression() &&
+ lhs.Materialized() == rhs.Materialized() &&
lhs.Aggregate() == rhs.Aggregate() &&
lhs.Group() == rhs.Group() &&
lhs.MaxInlineHunkSize() == rhs.MaxInlineHunkSize();
@@ -1902,9 +1933,13 @@ void ValidateColumnSchema(
}
ValidateSchemaValueType(columnSchema.GetWireType());
-
- if (columnSchema.Expression() && !columnSchema.SortOrder() && isTableDynamic) {
- THROW_ERROR_EXCEPTION("Non-key column cannot be computed");
+ if (columnSchema.Expression()) {
+ auto materialized = columnSchema.Materialized().value_or(true);
+ if (materialized && !columnSchema.SortOrder() && isTableDynamic) {
+ THROW_ERROR_EXCEPTION("Non-key column cannot be computed in materializable way");
+ } else if (!materialized && columnSchema.SortOrder()) {
+ THROW_ERROR_EXCEPTION("Key column cannot be computed in non-materializable way");
+ }
}
if (columnSchema.Aggregate() && columnSchema.SortOrder()) {
@@ -2418,6 +2453,7 @@ size_t THash<NYT::NTableClient::TColumnSchema>::operator()(const NYT::NTableClie
columnSchema.SortOrder(),
columnSchema.Lock(),
columnSchema.Expression(),
+ columnSchema.Materialized(),
columnSchema.Aggregate(),
columnSchema.Group(),
columnSchema.MaxInlineHunkSize());
diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h
index 149d0c1a04..4fd3c3f486 100644
--- a/yt/yt/client/table_client/schema.h
+++ b/yt/yt/client/table_client/schema.h
@@ -122,6 +122,7 @@ public:
DEFINE_BYREF_RO_PROPERTY(std::optional<ESortOrder>, SortOrder);
DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Lock);
DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Expression);
+ DEFINE_BYREF_RO_PROPERTY(std::optional<bool>, Materialized);
DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Aggregate);
DEFINE_BYREF_RO_PROPERTY(std::optional<TString>, Group);
DEFINE_BYREF_RO_PROPERTY(bool, Required);
@@ -156,6 +157,7 @@ public:
TColumnSchema& SetSortOrder(std::optional<ESortOrder> value);
TColumnSchema& SetLock(std::optional<TString> value);
TColumnSchema& SetExpression(std::optional<TString> value);
+ TColumnSchema& SetMaterialized(std::optional<bool> value);
TColumnSchema& SetAggregate(std::optional<TString> value);
TColumnSchema& SetGroup(std::optional<TString> value);
TColumnSchema& SetRequired(bool value);
@@ -277,6 +279,8 @@ public:
const std::optional<std::vector<TString>>& columnNames,
bool discardSortOrder = false) const;
+ bool HasMaterializedComputedColumns() const;
+ bool HasNonMaterializedComputedColumns() const;
bool HasComputedColumns() const;
bool HasAggregateColumns() const;
bool HasHunkColumns() const;
@@ -410,7 +414,8 @@ private:
std::shared_ptr<const TColumnInfo> ColumnInfo_;
int KeyColumnCount_ = 0;
- bool HasComputedColumns_ = false;
+ bool HasMaterializedComputedColumns_ = false;
+ bool HasNonMaterializedComputedColumns_ = false;
bool HasAggregateColumns_ = false;
THunkColumnIds HunkColumnsIds_;
diff --git a/yt/yt/client/table_client/schema_serialization_helpers.cpp b/yt/yt/client/table_client/schema_serialization_helpers.cpp
index 2190137b08..4f344101d8 100644
--- a/yt/yt/client/table_client/schema_serialization_helpers.cpp
+++ b/yt/yt/client/table_client/schema_serialization_helpers.cpp
@@ -42,6 +42,8 @@ void TSerializableColumnSchema::Register(TRegistrar registrar)
.Default();
registrar.BaseClassParameter("expression", &TThis::Expression_)
.Default();
+ registrar.BaseClassParameter("materialized", &TThis::Materialized_)
+ .Default();
registrar.BaseClassParameter("aggregate", &TThis::Aggregate_)
.Default();
registrar.BaseClassParameter("sort_order", &TThis::SortOrder_)
@@ -81,6 +83,9 @@ void TSerializableColumnSchema::DeserializeFromCursor(NYson::TYsonPullParserCurs
} else if (key == TStringBuf("expression")) {
cursor->Next();
SetExpression(ExtractTo<std::optional<TString>>(cursor));
+ } else if (key == TStringBuf("materialized")) {
+ cursor->Next();
+ SetMaterialized(ExtractTo<std::optional<bool>>(cursor));
} else if (key == TStringBuf("aggregate")) {
cursor->Next();
SetAggregate(ExtractTo<std::optional<TString>>(cursor));
diff --git a/yt/yt/core/misc/protobuf_helpers.h b/yt/yt/core/misc/protobuf_helpers.h
index c20ace8017..d9ec6b1bc0 100644
--- a/yt/yt/core/misc/protobuf_helpers.h
+++ b/yt/yt/core/misc/protobuf_helpers.h
@@ -395,6 +395,10 @@ google::protobuf::Timestamp GetProtoNow();
? std::optional(YT_PROTO_OPTIONAL_CONVERT(__VA_ARGS__)((message).field())) \
: std::nullopt)
+// TODO(cherepashka): to remove after std::optional::and_then is here.
+//! This macro may be used to extract std::optional<T> from protobuf message field of type T and to apply some function to value if it is present.
+#define YT_APPLY_PROTO_OPTIONAL(message, field, function) (((message).has_##field()) ? std::make_optional(function((message).field())) : std::nullopt)
+
////////////////////////////////////////////////////////////////////////////////
// TODO(gritukan): This is a hack that allows to use proper string type in the protobuf-related code.
diff --git a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
index 607dcdb06a..f77ee77747 100644
--- a/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
+++ b/yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto
@@ -1398,6 +1398,7 @@ message TColumnSchema
optional string lock = 4;
optional string expression = 5;
+ optional bool materialized = 13;
optional string aggregate = 6;
optional int32 sort_order = 7;
optional string group = 8;
diff --git a/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto b/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto
index 7ca77a0622..d41156814a 100644
--- a/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto
+++ b/yt/yt_proto/yt/client/table_chunk_format/proto/chunk_meta.proto
@@ -86,6 +86,7 @@ message TColumnSchema
optional string lock = 3;
optional string expression = 4;
+ optional bool materialized = 13;
optional string aggregate = 5;
optional int32 sort_order = 6;
optional string group = 7;