diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-23 18:08:39 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-23 18:30:22 +0300 |
commit | 50c69e8394f3c3e1afef75878953a4c4691d4383 (patch) | |
tree | 420a48ebfc10cd90e352785bf39f125e686ab08b /yt | |
parent | 5aecdb67595db1a47c933b7d8da2cb662a50e185 (diff) | |
download | ydb-50c69e8394f3c3e1afef75878953a4c4691d4383.tar.gz |
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/library/formats/arrow_writer.cpp | 261 |
1 files changed, 254 insertions, 7 deletions
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index e0aabc5b79..b4b8853571 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -82,6 +82,39 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali IsIntegralTypeSigned(simpleType)) .Union()); + case ESimpleLogicalValueType::Interval: + return std::make_tuple( + org::apache::arrow::flatbuf::Type_Int, + org::apache::arrow::flatbuf::CreateInt( + *flatbufBuilder, + 64, + true) + .Union()); + + case ESimpleLogicalValueType::Date: + return std::make_tuple( + org::apache::arrow::flatbuf::Type_Date, + org::apache::arrow::flatbuf::CreateDate( + *flatbufBuilder, + org::apache::arrow::flatbuf::DateUnit_DAY) + .Union()); + + case ESimpleLogicalValueType::Datetime: + return std::make_tuple( + org::apache::arrow::flatbuf::Type_Date, + org::apache::arrow::flatbuf::CreateDate( + *flatbufBuilder, + org::apache::arrow::flatbuf::DateUnit_MILLISECOND) + .Union()); + + case ESimpleLogicalValueType::Timestamp: + return std::make_tuple( + org::apache::arrow::flatbuf::Type_Timestamp, + org::apache::arrow::flatbuf::CreateTimestamp( + *flatbufBuilder, + org::apache::arrow::flatbuf::TimeUnit_MICROSECOND) + .Union()); + case ESimpleLogicalValueType::Double: return std::make_tuple( org::apache::arrow::flatbuf::Type_FloatingPoint, @@ -90,6 +123,14 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali org::apache::arrow::flatbuf::Precision_DOUBLE) .Union()); + case ESimpleLogicalValueType::Float: + return std::make_tuple( + org::apache::arrow::flatbuf::Type_FloatingPoint, + org::apache::arrow::flatbuf::CreateFloatingPoint( + *flatbufBuilder, + org::apache::arrow::flatbuf::Precision_SINGLE) + .Union()); + case ESimpleLogicalValueType::Boolean: return std::make_tuple( org::apache::arrow::flatbuf::Type_Bool, @@ -109,12 +150,6 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali org::apache::arrow::flatbuf::CreateUtf8(*flatbufBuilder) .Union()); - // TODO(babenko): the following types are not supported: - // Date - // Datetime - // Interval - // Timestamp - default: THROW_ERROR_EXCEPTION("Column %v has type %Qlv that is not currently supported by Arrow encoder", schema.GetDiagnosticNameString(), @@ -122,6 +157,27 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali } } +int GetIntegralLikeTypeByteSize(ESimpleLogicalValueType type) +{ + switch (type) { + case ESimpleLogicalValueType::Int8: + case ESimpleLogicalValueType::Uint8: + return 1; + case ESimpleLogicalValueType::Int16: + case ESimpleLogicalValueType::Uint16: + return 2; + case ESimpleLogicalValueType::Int32: + case ESimpleLogicalValueType::Uint32: + return 4; + case ESimpleLogicalValueType::Int64: + case ESimpleLogicalValueType::Uint64: + case ESimpleLogicalValueType::Interval: + return 8; + default: + YT_ABORT(); + } +} + bool IsRleButNotDictionaryEncodedStringLikeColumn(const TBatchColumn& column) { auto simpleType = CastToV1Type(column.Type).first; @@ -389,7 +445,7 @@ void SerializeIntegerColumn( SerializeColumnPrologue(typedColumn, context); context->AddBuffer( - column->ValueCount * GetIntegralTypeByteSize(simpleType), + column->ValueCount * GetIntegralLikeTypeByteSize(simpleType), [=] (TMutableRef dstRef) { const auto* valueColumn = column->Rle ? column->Rle->ValueColumn @@ -431,6 +487,7 @@ void SerializeIntegerColumn( XX(ui16, Uint16) XX(ui32, Uint32) XX(ui64, Uint64) + XX(i64, Interval) #undef XX @@ -442,6 +499,157 @@ void SerializeIntegerColumn( }); } +void SerializeDateColumn( + const TTypedBatchColumn& typedColumn, + TRecordBatchSerializationContext* context) +{ + const auto* column = typedColumn.Column; + YT_VERIFY(column->Values); + + YT_LOG_DEBUG("Adding data column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", + column->Id, + column->StartIndex, + column->ValueCount, + column->Rle.has_value()); + + SerializeColumnPrologue(typedColumn, context); + + context->AddBuffer( + column->ValueCount * sizeof(i32), + [=] (TMutableRef dstRef) { + const auto* valueColumn = column->Rle + ? column->Rle->ValueColumn + : column; + auto values = valueColumn->GetTypedValues<ui64>(); + + auto rleIndexes = column->Rle + ? column->GetTypedValues<ui64>() + : TRange<ui64>(); + + auto startIndex = column->StartIndex; + + auto dstValues = GetTypedValues<i32>(dstRef); + auto* currentOutput = dstValues.Begin(); + DecodeIntegerVector( + startIndex, + startIndex + column->ValueCount, + valueColumn->Values->BaseValue, + valueColumn->Values->ZigZagEncoded, + TRange<ui32>(), + rleIndexes, + [&] (auto index) { + return values[index]; + }, + [&] (auto value) { + if (value > std::numeric_limits<i32>::max()) { + THROW_ERROR_EXCEPTION("Date value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, std::numeric_limits<i32>::max()); + } + *currentOutput++ = value; + }); + }); +} + +void SerializeDatetimeColumn( + const TTypedBatchColumn& typedColumn, + TRecordBatchSerializationContext* context) +{ + const auto* column = typedColumn.Column; + const auto maxAllowedValue = std::numeric_limits<i64>::max() / 1000; + YT_VERIFY(column->Values); + + YT_LOG_DEBUG("Adding datetime column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", + column->Id, + column->StartIndex, + column->ValueCount, + column->Rle.has_value()); + + SerializeColumnPrologue(typedColumn, context); + + context->AddBuffer( + column->ValueCount * sizeof(i64), + [=] (TMutableRef dstRef) { + const auto* valueColumn = column->Rle + ? column->Rle->ValueColumn + : column; + auto values = valueColumn->GetTypedValues<ui64>(); + + auto rleIndexes = column->Rle + ? column->GetTypedValues<ui64>() + : TRange<ui64>(); + + auto startIndex = column->StartIndex; + + auto dstValues = GetTypedValues<i64>(dstRef); + auto* currentOutput = dstValues.Begin(); + DecodeIntegerVector( + startIndex, + startIndex + column->ValueCount, + valueColumn->Values->BaseValue, + valueColumn->Values->ZigZagEncoded, + TRange<ui32>(), + rleIndexes, + [&] (auto index) { + return values[index]; + }, + [&] (auto value) { + if(value > maxAllowedValue) { + THROW_ERROR_EXCEPTION("Datetime value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, maxAllowedValue); + } + *currentOutput++ = value * 1000; + }); + }); +} + +void SerializeTimestampColumn( + const TTypedBatchColumn& typedColumn, + TRecordBatchSerializationContext* context) +{ + const auto* column = typedColumn.Column; + YT_VERIFY(column->Values); + + YT_LOG_DEBUG("Adding timestamp column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", + column->Id, + column->StartIndex, + column->ValueCount, + column->Rle.has_value()); + + SerializeColumnPrologue(typedColumn, context); + + context->AddBuffer( + column->ValueCount * sizeof(i64), + [=] (TMutableRef dstRef) { + const auto* valueColumn = column->Rle + ? column->Rle->ValueColumn + : column; + auto values = valueColumn->GetTypedValues<ui64>(); + + auto rleIndexes = column->Rle + ? column->GetTypedValues<ui64>() + : TRange<ui64>(); + + auto startIndex = column->StartIndex; + + auto dstValues = GetTypedValues<i64>(dstRef); + auto* currentOutput = dstValues.Begin(); + DecodeIntegerVector( + startIndex, + startIndex + column->ValueCount, + valueColumn->Values->BaseValue, + valueColumn->Values->ZigZagEncoded, + TRange<ui32>(), + rleIndexes, + [&] (auto index) { + return values[index]; + }, + [&] (auto value) { + if (value > std::numeric_limits<i64>::max()) { + THROW_ERROR_EXCEPTION("Timestamp value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, std::numeric_limits<i64>::max()); + } + *currentOutput++ = value; + }); + }); +} + void SerializeDoubleColumn( const TTypedBatchColumn& typedColumn, TRecordBatchSerializationContext* context) @@ -471,6 +679,35 @@ void SerializeDoubleColumn( }); } +void SerializeFloatColumn( + const TTypedBatchColumn& typedColumn, + TRecordBatchSerializationContext* context) +{ + const auto* column = typedColumn.Column; + YT_VERIFY(column->Values); + YT_VERIFY(column->Values->BitWidth == 32); + YT_VERIFY(column->Values->BaseValue == 0); + YT_VERIFY(!column->Values->ZigZagEncoded); + + YT_LOG_DEBUG("Adding float column (ColumnId: %v, StartIndex: %v, ValueCount: %v)", + column->Id, + column->StartIndex, + column->ValueCount, + column->Rle.has_value()); + + SerializeColumnPrologue(typedColumn, context); + + context->AddBuffer( + column->ValueCount * sizeof(float), + [=] (TMutableRef dstRef) { + auto relevantValues = column->GetRelevantTypedValues<float>(); + ::memcpy( + dstRef.Begin(), + relevantValues.Begin(), + column->ValueCount * sizeof(float)); + }); +} + void SerializeStringLikeColumn( const TTypedBatchColumn& typedColumn, TRecordBatchSerializationContext* context) @@ -584,8 +821,18 @@ void SerializeColumn( auto simpleType = CastToV1Type(typedColumn.Type).first; if (IsIntegralType(simpleType)) { SerializeIntegerColumn(typedColumn, simpleType, context); + } else if (simpleType == ESimpleLogicalValueType::Interval) { + SerializeIntegerColumn(typedColumn, simpleType, context); + } else if (simpleType == ESimpleLogicalValueType::Date) { + SerializeDateColumn(typedColumn, context); + } else if (simpleType == ESimpleLogicalValueType::Datetime) { + SerializeDatetimeColumn(typedColumn, context); + } else if (simpleType == ESimpleLogicalValueType::Timestamp) { + SerializeTimestampColumn(typedColumn, context); } else if (simpleType == ESimpleLogicalValueType::Double) { SerializeDoubleColumn(typedColumn, context); + } else if (simpleType == ESimpleLogicalValueType::Float) { + SerializeFloatColumn(typedColumn, context); } else if (IsStringLikeType(simpleType)) { SerializeStringLikeColumn(typedColumn, context); } else if (simpleType == ESimpleLogicalValueType::Boolean) { |