diff options
author | nadya02 <nadya02@yandex-team.com> | 2023-10-13 08:15:37 +0300 |
---|---|---|
committer | nadya02 <nadya02@yandex-team.com> | 2023-10-13 08:39:24 +0300 |
commit | af4599178d9a2bc3e639d4b3c6af67388439cfee (patch) | |
tree | 385bf57426afd4b3ac090c79547c165f52329c40 | |
parent | b8300096b6ed8bad5c54007d6d3e59152951ace8 (diff) | |
download | ydb-af4599178d9a2bc3e639d4b3c6af67388439cfee.tar.gz |
YT-17840: Fix null and void columns in Arrow
-rw-r--r-- | yt/yt/client/arrow/arrow_row_stream_encoder.cpp | 18 | ||||
-rw-r--r-- | yt/yt/library/formats/arrow_writer.cpp | 33 |
2 files changed, 34 insertions, 17 deletions
diff --git a/yt/yt/client/arrow/arrow_row_stream_encoder.cpp b/yt/yt/client/arrow/arrow_row_stream_encoder.cpp index b5988c3368..b6dc214821 100644 --- a/yt/yt/client/arrow/arrow_row_stream_encoder.cpp +++ b/yt/yt/client/arrow/arrow_row_stream_encoder.cpp @@ -51,6 +51,7 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali auto simpleType = CastToV1Type(schema.LogicalType()).first; switch (simpleType) { case ESimpleLogicalValueType::Null: + case ESimpleLogicalValueType::Void: return std::make_tuple( org::apache::arrow::flatbuf::Type_Null, org::apache::arrow::flatbuf::CreateNull(*flatbufBuilder) @@ -549,6 +550,13 @@ void SerializeBooleanColumn( }); } +void SerializeNullColumn( + const TTypedBatchColumn& typedColumn, + TRecordBatchSerializationContext* context) +{ + SerializeColumnPrologue(typedColumn, context); +} + void SerializeColumn( const TTypedBatchColumn& typedColumn, TRecordBatchSerializationContext* context) @@ -580,7 +588,9 @@ void SerializeColumn( } else if (simpleType == ESimpleLogicalValueType::Boolean) { SerializeBooleanColumn(typedColumn, context); } else if (simpleType == ESimpleLogicalValueType::Null) { - // No buffers are allocated for null columns. + SerializeNullColumn(typedColumn, context); + } else if (simpleType == ESimpleLogicalValueType::Void) { + SerializeNullColumn(typedColumn, context); } else { THROW_ERROR_EXCEPTION("Column %v has unexpected type %Qlv", typedColumn.Column->Id, @@ -871,11 +881,7 @@ private: } return std::nullopt; } - auto columnSchema = *columnSchemaPtr; - if (CastToV1Type(columnSchema.LogicalType()).first != ESimpleLogicalValueType::Null) { - return columnSchema; - } - return std::nullopt; + return *columnSchemaPtr; } void PrepareColumns() diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index cd8e720767..2c40309c96 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -60,6 +60,7 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali auto simpleType = CastToV1Type(schema.LogicalType()).first; switch (simpleType) { case ESimpleLogicalValueType::Null: + case ESimpleLogicalValueType::Void: return std::make_tuple( org::apache::arrow::flatbuf::Type_Null, org::apache::arrow::flatbuf::CreateNull(*flatbufBuilder) @@ -552,6 +553,13 @@ void SerializeBooleanColumn( }); } +void SerializeNullColumn( + const TTypedBatchColumn& typedColumn, + TRecordBatchSerializationContext* context) +{ + SerializeColumnPrologue(typedColumn, context); +} + void SerializeColumn( const TTypedBatchColumn& typedColumn, TRecordBatchSerializationContext* context) @@ -583,7 +591,9 @@ void SerializeColumn( } else if (simpleType == ESimpleLogicalValueType::Boolean) { SerializeBooleanColumn(typedColumn, context); } else if (simpleType == ESimpleLogicalValueType::Null) { - // No buffers are allocated for null columns. + SerializeNullColumn(typedColumn, context); + } else if (simpleType == ESimpleLogicalValueType::Void) { + SerializeNullColumn(typedColumn, context); } else { THROW_ERROR_EXCEPTION("Column %v has unexpected type %Qlv", typedColumn.Column->Id, @@ -650,6 +660,7 @@ public: auto tableSchema = tableSchemas[0]; auto columnCount = NameTable_->GetSize(); + SchemaExistenceFlags_.resize(columnCount, true); for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) { ColumnSchemas_.push_back(GetColumnSchema(tableSchema, columnIndex)); @@ -719,6 +730,7 @@ private: std::vector<TTypedBatchColumn> TypedColumns_; std::vector<TColumnSchema> ColumnSchemas_; std::vector<IUnversionedColumnarRowBatch::TDictionaryId> ArrowDictionaryIds_; + std::vector<bool> SchemaExistenceFlags_; struct TMessage { @@ -737,22 +749,21 @@ private: ControlAttributesConfig_->EnableTabletIndex && IsTabletIndexColumnId(columnIndex); } - bool CheckIfTypeIsNotNull(int columnIndex) - { - YT_VERIFY(columnIndex >= 0 && columnIndex < std::ssize(ColumnSchemas_)); - return CastToV1Type(ColumnSchemas_[columnIndex].LogicalType()).first != ESimpleLogicalValueType::Null; - } - TColumnSchema GetColumnSchema(NTableClient::TTableSchemaPtr& tableSchema, int columnIndex) { YT_VERIFY(columnIndex >= 0); + SchemaExistenceFlags_[columnIndex] = true; auto name = NameTable_->GetName(columnIndex); auto columnSchema = tableSchema->FindColumn(name); if (!columnSchema) { - if (IsSystemColumnId(columnIndex) && CheckIfSystemColumnEnable(columnIndex)) { - return TColumnSchema(TString(name), EValueType::Int64); + if (IsSystemColumnId(columnIndex)) { + if (CheckIfSystemColumnEnable(columnIndex)) { + return TColumnSchema(TString(name), EValueType::Int64); + } + SchemaExistenceFlags_[columnIndex] = false; + return TColumnSchema(TString(name), EValueType::Null); } - return TColumnSchema(TString(name), EValueType::Null); + THROW_ERROR_EXCEPTION("Column %v has no schema", name); } return *columnSchema; } @@ -761,7 +772,7 @@ private: { TypedColumns_.reserve(batchColumns.Size()); for (const auto* column : batchColumns) { - if (CheckIfTypeIsNotNull(column->Id)) { + if (SchemaExistenceFlags_[column->Id]) { YT_VERIFY(column->Id >= 0 && column->Id < std::ssize(ColumnSchemas_)); TypedColumns_.push_back(TTypedBatchColumn{ column, |