diff options
author | psushin <psushin@yandex-team.com> | 2025-01-28 15:51:16 +0300 |
---|---|---|
committer | psushin <psushin@yandex-team.com> | 2025-01-28 16:07:40 +0300 |
commit | 6ee13b3691ee2cd77086f8a8ce9d0597a23f35fb (patch) | |
tree | c5a8a3aa36486870308887c17a67a85179ab9fec | |
parent | 2dd138fc52d464f57de7e7cfcea5f8b5a115f313 (diff) | |
download | ydb-6ee13b3691ee2cd77086f8a8ce9d0597a23f35fb.tar.gz |
YT-3129: support table_index, row_index and range_index in input_query
commit_hash:c36a5ed1ebd42240703a835e70dd0375ab74c48d
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 75 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.h | 27 | ||||
-rw-r--r-- | yt/yt/client/unittests/schema_ut.cpp | 70 |
3 files changed, 155 insertions, 17 deletions
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index 9cdf711958..a1a328ac72 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -1021,6 +1021,43 @@ TTableSchemaPtr TTableSchema::ToQuery() const } } +TTableSchemaPtr TTableSchema::WithSystemColumns(const TSystemColumnOptions& options) const +{ + std::vector<TColumnSchema> columns; + auto safeAdd = [&](const std::string& name, const ESimpleLogicalValueType type) { + const auto existing = FindColumn(name); + if (!existing) { + columns.push_back(TColumnSchema(name, type)); + } else if (!existing->IsOfV1Type(type)) { + THROW_ERROR_EXCEPTION("Cannot add column %Qv because of type mismatch", name); + } + }; + + if (options.EnableTableIndex) { + safeAdd(TableIndexColumnName, ESimpleLogicalValueType::Int64); + } + + if (options.EnableRowIndex) { + safeAdd(RowIndexColumnName, ESimpleLogicalValueType::Int64); + } + + if (options.EnableRangeIndex) { + safeAdd(RangeIndexColumnName, ESimpleLogicalValueType::Int64); + } + + if (ColumnInfo_) { + const auto& info = *ColumnInfo_; + columns.insert(columns.end(), info.Columns.begin(), info.Columns.end()); + } + + return New<TTableSchema>( + std::move(columns), + Strict_, + UniqueKeys_, + SchemaModification_, + DeletedColumns()); +} + TTableSchemaPtr TTableSchema::ToWriteViaQueueProducer() const { std::vector<TColumnSchema> columns; @@ -1839,9 +1876,17 @@ void ValidateDynamicTableKeyColumnCount(int count) void ValidateSystemColumnSchema( const TColumnSchema& columnSchema, bool isTableSorted, - bool allowUnversionedUpdateColumns, - bool allowTimestampColumns) -{ + const TSchemaValidationOptions& options) +{ + static const auto allowedOperationSystemColumns = THashMap<std::string, ESimpleLogicalValueType>{ + // Operation system columns are appended in context of job input query. + // Table index column is intentionally disabled in the result of input query, since it can be arbitrarily + // changed by the query itself, and final values would violate assumptions about the possible values of + // $table_index inside format writers. + {RowIndexColumnName, ESimpleLogicalValueType::Int64}, + {RangeIndexColumnName, ESimpleLogicalValueType::Int64}, + }; + static const auto allowedSortedTablesSystemColumns = THashMap<std::string, ESimpleLogicalValueType>{ {EmptyValueColumnName, ESimpleLogicalValueType::Int64}, {TtlColumnName, ESimpleLogicalValueType::Uint64}, @@ -1880,7 +1925,15 @@ void ValidateSystemColumnSchema( return; } - if (allowUnversionedUpdateColumns) { + if (options.AllowOperationColumns) { + auto it = allowedOperationSystemColumns.find(name); + if (it != allowedOperationSystemColumns.end()) { + validateType(it->second); + return; + } + } + + if (options.AllowUnversionedUpdateColumns) { // Unversioned update schema system column. if (name == TUnversionedUpdateSchema::ChangeTypeColumnName) { validateType(ESimpleLogicalValueType::Uint64); @@ -1894,7 +1947,7 @@ void ValidateSystemColumnSchema( } } - if (allowTimestampColumns) { + if (options.AllowTimestampColumns) { if (name.starts_with(TimestampColumnPrefix)) { validateType(ESimpleLogicalValueType::Uint64); return; @@ -1924,8 +1977,7 @@ void ValidateColumnSchema( const TColumnSchema& columnSchema, bool isTableSorted, bool isTableDynamic, - bool allowUnversionedUpdateColumns, - bool allowTimestampColumns) + const TSchemaValidationOptions& options) { static const auto allowedAggregates = THashSet<std::string, THash<TStringBuf>, TEqualTo<>>{ "sum", @@ -1954,8 +2006,7 @@ void ValidateColumnSchema( ValidateSystemColumnSchema( columnSchema, isTableSorted, - allowUnversionedUpdateColumns, - allowTimestampColumns); + options); } { @@ -2293,8 +2344,7 @@ void ValidateSchemaAttributes(const TTableSchema& schema) void ValidateTableSchema( const TTableSchema& schema, bool isTableDynamic, - bool allowUnversionedUpdateColumns, - bool allowTimestampColumns) + const TSchemaValidationOptions& options) { int totalTypeComplexity = 0; for (const auto& column : schema.Columns()) { @@ -2302,8 +2352,7 @@ void ValidateTableSchema( column, schema.IsSorted(), isTableDynamic, - allowUnversionedUpdateColumns, - allowTimestampColumns); + options); if (!schema.GetStrict() && column.IsRenamed()) { THROW_ERROR_EXCEPTION("Renamed column %v in non-strict schema", column.GetDiagnosticNameString()); diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h index ac36c07435..f6c9f147a1 100644 --- a/yt/yt/client/table_client/schema.h +++ b/yt/yt/client/table_client/schema.h @@ -230,6 +230,13 @@ public: const TTableSchema& Schema_; }; + struct TSystemColumnOptions + { + bool EnableTableIndex; + bool EnableRowIndex; + bool EnableRangeIndex; + }; + public: const std::vector<TColumnSchema>& Columns() const; const std::vector<TDeletedColumn>& DeletedColumns() const; @@ -328,6 +335,9 @@ public: //! For ordered tables, prepends the current schema with |(tablet_index, row_index)| key columns. TTableSchemaPtr ToQuery() const; + //! Appends |$table_index|, |$row_index| and/or |$range_index|, based on the options. + TTableSchemaPtr WithSystemColumns(const TSystemColumnOptions& options) const; + //! For sorted tables, return the current schema without computed columns. //! For ordered tables, prepends the current schema with |(tablet_index)| key column //! but without |$timestamp| column, if any. @@ -521,18 +531,27 @@ void ValidateDynamicTableKeyColumnCount(int count); void ValidateColumnName(const std::string& name); +//////////////////////////////////////////////////////////////////////////////// + +struct TSchemaValidationOptions +{ + bool AllowUnversionedUpdateColumns = false; + bool AllowTimestampColumns = false; + bool AllowOperationColumns = false; +}; + void ValidateColumnSchema( const TColumnSchema& columnSchema, bool isTableSorted = false, bool isTableDynamic = false, - bool allowUnversionedUpdateColumns = false, - bool allowTimestampColumns = false); + const TSchemaValidationOptions& options = {}); void ValidateTableSchema( const TTableSchema& schema, bool isTableDynamic = false, - bool allowUnversionedUpdateColumns = false, - bool allowTimestampColumns = false); + const TSchemaValidationOptions& options = {}); + +//////////////////////////////////////////////////////////////////////////////// void ValidateNoDescendingSortOrder(const TTableSchema& schema); diff --git a/yt/yt/client/unittests/schema_ut.cpp b/yt/yt/client/unittests/schema_ut.cpp index 4c4b597f0d..83fc60b9d5 100644 --- a/yt/yt/client/unittests/schema_ut.cpp +++ b/yt/yt/client/unittests/schema_ut.cpp @@ -376,6 +376,37 @@ TEST(TTableSchemaTest, ColumnSchemaValidation) {"foo", SimpleLogicalType(ESimpleLogicalValueType::Int64)}, {"bar", SimpleLogicalType(ESimpleLogicalValueType::String)}, }), ESortOrder::Ascending)); + + // Allow some names starting from SystemColumnNamePrefix + EXPECT_NO_THROW( + ValidateColumnSchema( + TColumnSchema(RowIndexColumnName, EValueType::Int64), + /*isTableSorted*/ false, + /*isTableDynamic*/ false, + /*options*/ {.AllowOperationColumns = true}) + ); + EXPECT_NO_THROW( + ValidateColumnSchema( + TColumnSchema(RangeIndexColumnName, EValueType::Int64), + /*isTableSorted*/ false, + /*isTableDynamic*/ false, + /*options*/ {.AllowOperationColumns = true}) + ); + EXPECT_THROW( + ValidateColumnSchema( + TColumnSchema(TableIndexColumnName, EValueType::Int64), + /*isTableSorted*/ false, + /*isTableDynamic*/ false, + /*options*/ {.AllowOperationColumns = true}), + std::exception); + EXPECT_THROW( + ValidateColumnSchema( + TColumnSchema(EmptyValueColumnName, EValueType::Int64), + /*isTableSorted*/ false, + /*isTableDynamic*/ false, + /*options*/ {.AllowOperationColumns = true}), + std::exception); + } TEST(TTableSchemaTest, ValidateTableSchemaTest) @@ -536,6 +567,45 @@ TEST(TTableSchemaTest, ValidateTableSchemaNestedColumns) }); } +TEST(TTableSchemaTest, WithSystemColumns) +{ + const auto schema1 = TTableSchema({ + TColumnSchema("foo", SimpleLogicalType(ESimpleLogicalValueType::Int64)), + }); + + const auto schema2Ptr = schema1.WithSystemColumns({ + .EnableRangeIndex = true, + }); + + EXPECT_EQ(schema2Ptr->Columns().size(), 2u); + EXPECT_TRUE(schema2Ptr->FindColumn("foo")); + EXPECT_TRUE(schema2Ptr->FindColumn(RangeIndexColumnName)); + + const auto schema3Ptr = schema1.WithSystemColumns({ + .EnableTableIndex = true, + .EnableRowIndex = true, + .EnableRangeIndex = true, + }); + + EXPECT_EQ(schema3Ptr->Columns().size(), 4u); + EXPECT_TRUE(schema3Ptr->FindColumn("foo")); + EXPECT_TRUE(schema3Ptr->FindColumn(TableIndexColumnName)); + EXPECT_TRUE(schema3Ptr->FindColumn(RowIndexColumnName)); + EXPECT_TRUE(schema3Ptr->FindColumn(RangeIndexColumnName)); + + EXPECT_EQ(*schema3Ptr, *schema2Ptr->WithSystemColumns({ + .EnableTableIndex = true, + .EnableRowIndex = true, + .EnableRangeIndex = true, + })); + + EXPECT_THROW_WITH_SUBSTRING( + TTableSchema({ + TColumnSchema(RowIndexColumnName, SimpleLogicalType(ESimpleLogicalValueType::String)), + }).WithSystemColumns({.EnableRowIndex = true}), + "Cannot add column"); +} + //////////////////////////////////////////////////////////////////////////////// TEST(TLockMaskTest, Simple) |