aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya02 <nadya02@yandex-team.com>2023-10-13 08:15:37 +0300
committernadya02 <nadya02@yandex-team.com>2023-10-13 08:39:24 +0300
commitaf4599178d9a2bc3e639d4b3c6af67388439cfee (patch)
tree385bf57426afd4b3ac090c79547c165f52329c40
parentb8300096b6ed8bad5c54007d6d3e59152951ace8 (diff)
downloadydb-af4599178d9a2bc3e639d4b3c6af67388439cfee.tar.gz
YT-17840: Fix null and void columns in Arrow
-rw-r--r--yt/yt/client/arrow/arrow_row_stream_encoder.cpp18
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp33
2 files changed, 34 insertions, 17 deletions
diff --git a/yt/yt/client/arrow/arrow_row_stream_encoder.cpp b/yt/yt/client/arrow/arrow_row_stream_encoder.cpp
index b5988c3368..b6dc214821 100644
--- a/yt/yt/client/arrow/arrow_row_stream_encoder.cpp
+++ b/yt/yt/client/arrow/arrow_row_stream_encoder.cpp
@@ -51,6 +51,7 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
auto simpleType = CastToV1Type(schema.LogicalType()).first;
switch (simpleType) {
case ESimpleLogicalValueType::Null:
+ case ESimpleLogicalValueType::Void:
return std::make_tuple(
org::apache::arrow::flatbuf::Type_Null,
org::apache::arrow::flatbuf::CreateNull(*flatbufBuilder)
@@ -549,6 +550,13 @@ void SerializeBooleanColumn(
});
}
+void SerializeNullColumn(
+ const TTypedBatchColumn& typedColumn,
+ TRecordBatchSerializationContext* context)
+{
+ SerializeColumnPrologue(typedColumn, context);
+}
+
void SerializeColumn(
const TTypedBatchColumn& typedColumn,
TRecordBatchSerializationContext* context)
@@ -580,7 +588,9 @@ void SerializeColumn(
} else if (simpleType == ESimpleLogicalValueType::Boolean) {
SerializeBooleanColumn(typedColumn, context);
} else if (simpleType == ESimpleLogicalValueType::Null) {
- // No buffers are allocated for null columns.
+ SerializeNullColumn(typedColumn, context);
+ } else if (simpleType == ESimpleLogicalValueType::Void) {
+ SerializeNullColumn(typedColumn, context);
} else {
THROW_ERROR_EXCEPTION("Column %v has unexpected type %Qlv",
typedColumn.Column->Id,
@@ -871,11 +881,7 @@ private:
}
return std::nullopt;
}
- auto columnSchema = *columnSchemaPtr;
- if (CastToV1Type(columnSchema.LogicalType()).first != ESimpleLogicalValueType::Null) {
- return columnSchema;
- }
- return std::nullopt;
+ return *columnSchemaPtr;
}
void PrepareColumns()
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index cd8e720767..2c40309c96 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -60,6 +60,7 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
auto simpleType = CastToV1Type(schema.LogicalType()).first;
switch (simpleType) {
case ESimpleLogicalValueType::Null:
+ case ESimpleLogicalValueType::Void:
return std::make_tuple(
org::apache::arrow::flatbuf::Type_Null,
org::apache::arrow::flatbuf::CreateNull(*flatbufBuilder)
@@ -552,6 +553,13 @@ void SerializeBooleanColumn(
});
}
+void SerializeNullColumn(
+ const TTypedBatchColumn& typedColumn,
+ TRecordBatchSerializationContext* context)
+{
+ SerializeColumnPrologue(typedColumn, context);
+}
+
void SerializeColumn(
const TTypedBatchColumn& typedColumn,
TRecordBatchSerializationContext* context)
@@ -583,7 +591,9 @@ void SerializeColumn(
} else if (simpleType == ESimpleLogicalValueType::Boolean) {
SerializeBooleanColumn(typedColumn, context);
} else if (simpleType == ESimpleLogicalValueType::Null) {
- // No buffers are allocated for null columns.
+ SerializeNullColumn(typedColumn, context);
+ } else if (simpleType == ESimpleLogicalValueType::Void) {
+ SerializeNullColumn(typedColumn, context);
} else {
THROW_ERROR_EXCEPTION("Column %v has unexpected type %Qlv",
typedColumn.Column->Id,
@@ -650,6 +660,7 @@ public:
auto tableSchema = tableSchemas[0];
auto columnCount = NameTable_->GetSize();
+ SchemaExistenceFlags_.resize(columnCount, true);
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
ColumnSchemas_.push_back(GetColumnSchema(tableSchema, columnIndex));
@@ -719,6 +730,7 @@ private:
std::vector<TTypedBatchColumn> TypedColumns_;
std::vector<TColumnSchema> ColumnSchemas_;
std::vector<IUnversionedColumnarRowBatch::TDictionaryId> ArrowDictionaryIds_;
+ std::vector<bool> SchemaExistenceFlags_;
struct TMessage
{
@@ -737,22 +749,21 @@ private:
ControlAttributesConfig_->EnableTabletIndex && IsTabletIndexColumnId(columnIndex);
}
- bool CheckIfTypeIsNotNull(int columnIndex)
- {
- YT_VERIFY(columnIndex >= 0 && columnIndex < std::ssize(ColumnSchemas_));
- return CastToV1Type(ColumnSchemas_[columnIndex].LogicalType()).first != ESimpleLogicalValueType::Null;
- }
-
TColumnSchema GetColumnSchema(NTableClient::TTableSchemaPtr& tableSchema, int columnIndex)
{
YT_VERIFY(columnIndex >= 0);
+ SchemaExistenceFlags_[columnIndex] = true;
auto name = NameTable_->GetName(columnIndex);
auto columnSchema = tableSchema->FindColumn(name);
if (!columnSchema) {
- if (IsSystemColumnId(columnIndex) && CheckIfSystemColumnEnable(columnIndex)) {
- return TColumnSchema(TString(name), EValueType::Int64);
+ if (IsSystemColumnId(columnIndex)) {
+ if (CheckIfSystemColumnEnable(columnIndex)) {
+ return TColumnSchema(TString(name), EValueType::Int64);
+ }
+ SchemaExistenceFlags_[columnIndex] = false;
+ return TColumnSchema(TString(name), EValueType::Null);
}
- return TColumnSchema(TString(name), EValueType::Null);
+ THROW_ERROR_EXCEPTION("Column %v has no schema", name);
}
return *columnSchema;
}
@@ -761,7 +772,7 @@ private:
{
TypedColumns_.reserve(batchColumns.Size());
for (const auto* column : batchColumns) {
- if (CheckIfTypeIsNotNull(column->Id)) {
+ if (SchemaExistenceFlags_[column->Id]) {
YT_VERIFY(column->Id >= 0 && column->Id < std::ssize(ColumnSchemas_));
TypedColumns_.push_back(TTypedBatchColumn{
column,