aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpsushin <psushin@yandex-team.com>2025-01-28 15:51:16 +0300
committerpsushin <psushin@yandex-team.com>2025-01-28 16:07:40 +0300
commit6ee13b3691ee2cd77086f8a8ce9d0597a23f35fb (patch)
treec5a8a3aa36486870308887c17a67a85179ab9fec
parent2dd138fc52d464f57de7e7cfcea5f8b5a115f313 (diff)
downloadydb-6ee13b3691ee2cd77086f8a8ce9d0597a23f35fb.tar.gz
YT-3129: support table_index, row_index and range_index in input_query
commit_hash:c36a5ed1ebd42240703a835e70dd0375ab74c48d
-rw-r--r--yt/yt/client/table_client/schema.cpp75
-rw-r--r--yt/yt/client/table_client/schema.h27
-rw-r--r--yt/yt/client/unittests/schema_ut.cpp70
3 files changed, 155 insertions, 17 deletions
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index 9cdf711958..a1a328ac72 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -1021,6 +1021,43 @@ TTableSchemaPtr TTableSchema::ToQuery() const
}
}
+TTableSchemaPtr TTableSchema::WithSystemColumns(const TSystemColumnOptions& options) const
+{
+ std::vector<TColumnSchema> columns;
+ auto safeAdd = [&](const std::string& name, const ESimpleLogicalValueType type) {
+ const auto existing = FindColumn(name);
+ if (!existing) {
+ columns.push_back(TColumnSchema(name, type));
+ } else if (!existing->IsOfV1Type(type)) {
+ THROW_ERROR_EXCEPTION("Cannot add column %Qv because of type mismatch", name);
+ }
+ };
+
+ if (options.EnableTableIndex) {
+ safeAdd(TableIndexColumnName, ESimpleLogicalValueType::Int64);
+ }
+
+ if (options.EnableRowIndex) {
+ safeAdd(RowIndexColumnName, ESimpleLogicalValueType::Int64);
+ }
+
+ if (options.EnableRangeIndex) {
+ safeAdd(RangeIndexColumnName, ESimpleLogicalValueType::Int64);
+ }
+
+ if (ColumnInfo_) {
+ const auto& info = *ColumnInfo_;
+ columns.insert(columns.end(), info.Columns.begin(), info.Columns.end());
+ }
+
+ return New<TTableSchema>(
+ std::move(columns),
+ Strict_,
+ UniqueKeys_,
+ SchemaModification_,
+ DeletedColumns());
+}
+
TTableSchemaPtr TTableSchema::ToWriteViaQueueProducer() const
{
std::vector<TColumnSchema> columns;
@@ -1839,9 +1876,17 @@ void ValidateDynamicTableKeyColumnCount(int count)
void ValidateSystemColumnSchema(
const TColumnSchema& columnSchema,
bool isTableSorted,
- bool allowUnversionedUpdateColumns,
- bool allowTimestampColumns)
-{
+ const TSchemaValidationOptions& options)
+{
+ static const auto allowedOperationSystemColumns = THashMap<std::string, ESimpleLogicalValueType>{
+ // Operation system columns are appended in context of job input query.
+ // Table index column is intentionally disabled in the result of input query, since it can be arbitrarily
+ // changed by the query itself, and final values would violate assumptions about the possible values of
+ // $table_index inside format writers.
+ {RowIndexColumnName, ESimpleLogicalValueType::Int64},
+ {RangeIndexColumnName, ESimpleLogicalValueType::Int64},
+ };
+
static const auto allowedSortedTablesSystemColumns = THashMap<std::string, ESimpleLogicalValueType>{
{EmptyValueColumnName, ESimpleLogicalValueType::Int64},
{TtlColumnName, ESimpleLogicalValueType::Uint64},
@@ -1880,7 +1925,15 @@ void ValidateSystemColumnSchema(
return;
}
- if (allowUnversionedUpdateColumns) {
+ if (options.AllowOperationColumns) {
+ auto it = allowedOperationSystemColumns.find(name);
+ if (it != allowedOperationSystemColumns.end()) {
+ validateType(it->second);
+ return;
+ }
+ }
+
+ if (options.AllowUnversionedUpdateColumns) {
// Unversioned update schema system column.
if (name == TUnversionedUpdateSchema::ChangeTypeColumnName) {
validateType(ESimpleLogicalValueType::Uint64);
@@ -1894,7 +1947,7 @@ void ValidateSystemColumnSchema(
}
}
- if (allowTimestampColumns) {
+ if (options.AllowTimestampColumns) {
if (name.starts_with(TimestampColumnPrefix)) {
validateType(ESimpleLogicalValueType::Uint64);
return;
@@ -1924,8 +1977,7 @@ void ValidateColumnSchema(
const TColumnSchema& columnSchema,
bool isTableSorted,
bool isTableDynamic,
- bool allowUnversionedUpdateColumns,
- bool allowTimestampColumns)
+ const TSchemaValidationOptions& options)
{
static const auto allowedAggregates = THashSet<std::string, THash<TStringBuf>, TEqualTo<>>{
"sum",
@@ -1954,8 +2006,7 @@ void ValidateColumnSchema(
ValidateSystemColumnSchema(
columnSchema,
isTableSorted,
- allowUnversionedUpdateColumns,
- allowTimestampColumns);
+ options);
}
{
@@ -2293,8 +2344,7 @@ void ValidateSchemaAttributes(const TTableSchema& schema)
void ValidateTableSchema(
const TTableSchema& schema,
bool isTableDynamic,
- bool allowUnversionedUpdateColumns,
- bool allowTimestampColumns)
+ const TSchemaValidationOptions& options)
{
int totalTypeComplexity = 0;
for (const auto& column : schema.Columns()) {
@@ -2302,8 +2352,7 @@ void ValidateTableSchema(
column,
schema.IsSorted(),
isTableDynamic,
- allowUnversionedUpdateColumns,
- allowTimestampColumns);
+ options);
if (!schema.GetStrict() && column.IsRenamed()) {
THROW_ERROR_EXCEPTION("Renamed column %v in non-strict schema",
column.GetDiagnosticNameString());
diff --git a/yt/yt/client/table_client/schema.h b/yt/yt/client/table_client/schema.h
index ac36c07435..f6c9f147a1 100644
--- a/yt/yt/client/table_client/schema.h
+++ b/yt/yt/client/table_client/schema.h
@@ -230,6 +230,13 @@ public:
const TTableSchema& Schema_;
};
+ struct TSystemColumnOptions
+ {
+ bool EnableTableIndex;
+ bool EnableRowIndex;
+ bool EnableRangeIndex;
+ };
+
public:
const std::vector<TColumnSchema>& Columns() const;
const std::vector<TDeletedColumn>& DeletedColumns() const;
@@ -328,6 +335,9 @@ public:
//! For ordered tables, prepends the current schema with |(tablet_index, row_index)| key columns.
TTableSchemaPtr ToQuery() const;
+ //! Appends |$table_index|, |$row_index| and/or |$range_index|, based on the options.
+ TTableSchemaPtr WithSystemColumns(const TSystemColumnOptions& options) const;
+
//! For sorted tables, return the current schema without computed columns.
//! For ordered tables, prepends the current schema with |(tablet_index)| key column
//! but without |$timestamp| column, if any.
@@ -521,18 +531,27 @@ void ValidateDynamicTableKeyColumnCount(int count);
void ValidateColumnName(const std::string& name);
+////////////////////////////////////////////////////////////////////////////////
+
+struct TSchemaValidationOptions
+{
+ bool AllowUnversionedUpdateColumns = false;
+ bool AllowTimestampColumns = false;
+ bool AllowOperationColumns = false;
+};
+
void ValidateColumnSchema(
const TColumnSchema& columnSchema,
bool isTableSorted = false,
bool isTableDynamic = false,
- bool allowUnversionedUpdateColumns = false,
- bool allowTimestampColumns = false);
+ const TSchemaValidationOptions& options = {});
void ValidateTableSchema(
const TTableSchema& schema,
bool isTableDynamic = false,
- bool allowUnversionedUpdateColumns = false,
- bool allowTimestampColumns = false);
+ const TSchemaValidationOptions& options = {});
+
+////////////////////////////////////////////////////////////////////////////////
void ValidateNoDescendingSortOrder(const TTableSchema& schema);
diff --git a/yt/yt/client/unittests/schema_ut.cpp b/yt/yt/client/unittests/schema_ut.cpp
index 4c4b597f0d..83fc60b9d5 100644
--- a/yt/yt/client/unittests/schema_ut.cpp
+++ b/yt/yt/client/unittests/schema_ut.cpp
@@ -376,6 +376,37 @@ TEST(TTableSchemaTest, ColumnSchemaValidation)
{"foo", SimpleLogicalType(ESimpleLogicalValueType::Int64)},
{"bar", SimpleLogicalType(ESimpleLogicalValueType::String)},
}), ESortOrder::Ascending));
+
+ // Allow some names starting from SystemColumnNamePrefix
+ EXPECT_NO_THROW(
+ ValidateColumnSchema(
+ TColumnSchema(RowIndexColumnName, EValueType::Int64),
+ /*isTableSorted*/ false,
+ /*isTableDynamic*/ false,
+ /*options*/ {.AllowOperationColumns = true})
+ );
+ EXPECT_NO_THROW(
+ ValidateColumnSchema(
+ TColumnSchema(RangeIndexColumnName, EValueType::Int64),
+ /*isTableSorted*/ false,
+ /*isTableDynamic*/ false,
+ /*options*/ {.AllowOperationColumns = true})
+ );
+ EXPECT_THROW(
+ ValidateColumnSchema(
+ TColumnSchema(TableIndexColumnName, EValueType::Int64),
+ /*isTableSorted*/ false,
+ /*isTableDynamic*/ false,
+ /*options*/ {.AllowOperationColumns = true}),
+ std::exception);
+ EXPECT_THROW(
+ ValidateColumnSchema(
+ TColumnSchema(EmptyValueColumnName, EValueType::Int64),
+ /*isTableSorted*/ false,
+ /*isTableDynamic*/ false,
+ /*options*/ {.AllowOperationColumns = true}),
+ std::exception);
+
}
TEST(TTableSchemaTest, ValidateTableSchemaTest)
@@ -536,6 +567,45 @@ TEST(TTableSchemaTest, ValidateTableSchemaNestedColumns)
});
}
+TEST(TTableSchemaTest, WithSystemColumns)
+{
+ const auto schema1 = TTableSchema({
+ TColumnSchema("foo", SimpleLogicalType(ESimpleLogicalValueType::Int64)),
+ });
+
+ const auto schema2Ptr = schema1.WithSystemColumns({
+ .EnableRangeIndex = true,
+ });
+
+ EXPECT_EQ(schema2Ptr->Columns().size(), 2u);
+ EXPECT_TRUE(schema2Ptr->FindColumn("foo"));
+ EXPECT_TRUE(schema2Ptr->FindColumn(RangeIndexColumnName));
+
+ const auto schema3Ptr = schema1.WithSystemColumns({
+ .EnableTableIndex = true,
+ .EnableRowIndex = true,
+ .EnableRangeIndex = true,
+ });
+
+ EXPECT_EQ(schema3Ptr->Columns().size(), 4u);
+ EXPECT_TRUE(schema3Ptr->FindColumn("foo"));
+ EXPECT_TRUE(schema3Ptr->FindColumn(TableIndexColumnName));
+ EXPECT_TRUE(schema3Ptr->FindColumn(RowIndexColumnName));
+ EXPECT_TRUE(schema3Ptr->FindColumn(RangeIndexColumnName));
+
+ EXPECT_EQ(*schema3Ptr, *schema2Ptr->WithSystemColumns({
+ .EnableTableIndex = true,
+ .EnableRowIndex = true,
+ .EnableRangeIndex = true,
+ }));
+
+ EXPECT_THROW_WITH_SUBSTRING(
+ TTableSchema({
+ TColumnSchema(RowIndexColumnName, SimpleLogicalType(ESimpleLogicalValueType::String)),
+ }).WithSystemColumns({.EnableRowIndex = true}),
+ "Cannot add column");
+}
+
////////////////////////////////////////////////////////////////////////////////
TEST(TLockMaskTest, Simple)