diff options
| author | robot-piglet <[email protected]> | 2023-11-23 18:08:39 +0300 | 
|---|---|---|
| committer | robot-piglet <[email protected]> | 2023-11-23 18:30:22 +0300 | 
| commit | 50c69e8394f3c3e1afef75878953a4c4691d4383 (patch) | |
| tree | 420a48ebfc10cd90e352785bf39f125e686ab08b | |
| parent | 5aecdb67595db1a47c933b7d8da2cb662a50e185 (diff) | |
Intermediate changes
| -rw-r--r-- | yt/yt/library/formats/arrow_writer.cpp | 261 | 
1 files changed, 254 insertions, 7 deletions
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index e0aabc5b79f..b4b88535714 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -82,6 +82,39 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali                      IsIntegralTypeSigned(simpleType))                      .Union()); +        case ESimpleLogicalValueType::Interval: +            return std::make_tuple( +                org::apache::arrow::flatbuf::Type_Int, +                org::apache::arrow::flatbuf::CreateInt( +                    *flatbufBuilder, +                    64, +                    true) +                    .Union()); + +        case ESimpleLogicalValueType::Date: +            return std::make_tuple( +                org::apache::arrow::flatbuf::Type_Date, +                org::apache::arrow::flatbuf::CreateDate( +                    *flatbufBuilder, +                    org::apache::arrow::flatbuf::DateUnit_DAY) +                    .Union()); + +        case ESimpleLogicalValueType::Datetime: +            return std::make_tuple( +                org::apache::arrow::flatbuf::Type_Date, +                org::apache::arrow::flatbuf::CreateDate( +                    *flatbufBuilder, +                    org::apache::arrow::flatbuf::DateUnit_MILLISECOND) +                    .Union()); + +        case ESimpleLogicalValueType::Timestamp: +            return std::make_tuple( +                org::apache::arrow::flatbuf::Type_Timestamp, +                org::apache::arrow::flatbuf::CreateTimestamp( +                    *flatbufBuilder, +                    org::apache::arrow::flatbuf::TimeUnit_MICROSECOND) +                    .Union()); +          case ESimpleLogicalValueType::Double:              return std::make_tuple(                  org::apache::arrow::flatbuf::Type_FloatingPoint, @@ -90,6 +123,14 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali                      org::apache::arrow::flatbuf::Precision_DOUBLE)                      .Union()); +        case ESimpleLogicalValueType::Float: +            return std::make_tuple( +                org::apache::arrow::flatbuf::Type_FloatingPoint, +                org::apache::arrow::flatbuf::CreateFloatingPoint( +                    *flatbufBuilder, +                    org::apache::arrow::flatbuf::Precision_SINGLE) +                    .Union()); +          case ESimpleLogicalValueType::Boolean:              return std::make_tuple(                  org::apache::arrow::flatbuf::Type_Bool, @@ -109,12 +150,6 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali                  org::apache::arrow::flatbuf::CreateUtf8(*flatbufBuilder)                      .Union()); -            // TODO(babenko): the following types are not supported: -            //   Date -            //   Datetime -            //   Interval -            //   Timestamp -          default:              THROW_ERROR_EXCEPTION("Column %v has type %Qlv that is not currently supported by Arrow encoder",                  schema.GetDiagnosticNameString(), @@ -122,6 +157,27 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali      }  } +int GetIntegralLikeTypeByteSize(ESimpleLogicalValueType type) +{ +    switch (type) { +        case ESimpleLogicalValueType::Int8: +        case ESimpleLogicalValueType::Uint8: +            return 1; +        case ESimpleLogicalValueType::Int16: +        case ESimpleLogicalValueType::Uint16: +            return 2; +        case ESimpleLogicalValueType::Int32: +        case ESimpleLogicalValueType::Uint32: +            return 4; +        case ESimpleLogicalValueType::Int64: +        case ESimpleLogicalValueType::Uint64: +        case ESimpleLogicalValueType::Interval: +            return 8; +        default: +            YT_ABORT(); +    } +} +  bool IsRleButNotDictionaryEncodedStringLikeColumn(const TBatchColumn& column)  {      auto simpleType = CastToV1Type(column.Type).first; @@ -389,7 +445,7 @@ void SerializeIntegerColumn(      SerializeColumnPrologue(typedColumn, context);      context->AddBuffer( -        column->ValueCount * GetIntegralTypeByteSize(simpleType), +        column->ValueCount * GetIntegralLikeTypeByteSize(simpleType),          [=] (TMutableRef dstRef) {              const auto* valueColumn = column->Rle                  ? column->Rle->ValueColumn @@ -431,6 +487,7 @@ void SerializeIntegerColumn(                  XX(ui16, Uint16)                  XX(ui32, Uint32)                  XX(ui64, Uint64) +                XX(i64, Interval)  #undef XX @@ -442,6 +499,157 @@ void SerializeIntegerColumn(          });  } +void SerializeDateColumn( +    const TTypedBatchColumn& typedColumn, +    TRecordBatchSerializationContext* context) +{ +    const auto* column = typedColumn.Column; +    YT_VERIFY(column->Values); + +    YT_LOG_DEBUG("Adding data column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", +        column->Id, +        column->StartIndex, +        column->ValueCount, +        column->Rle.has_value()); + +    SerializeColumnPrologue(typedColumn, context); + +    context->AddBuffer( +        column->ValueCount * sizeof(i32), +        [=] (TMutableRef dstRef) { +            const auto* valueColumn = column->Rle +                ? column->Rle->ValueColumn +                : column; +            auto values = valueColumn->GetTypedValues<ui64>(); + +            auto rleIndexes = column->Rle +                ? column->GetTypedValues<ui64>() +                : TRange<ui64>(); + +            auto startIndex = column->StartIndex; + +            auto dstValues = GetTypedValues<i32>(dstRef); +            auto* currentOutput = dstValues.Begin(); +            DecodeIntegerVector( +                startIndex, +                startIndex + column->ValueCount, +                valueColumn->Values->BaseValue, +                valueColumn->Values->ZigZagEncoded, +                TRange<ui32>(), +                rleIndexes, +                [&] (auto index) { +                    return values[index]; +                }, +                [&] (auto value) { +                    if (value > std::numeric_limits<i32>::max()) { +                        THROW_ERROR_EXCEPTION("Date value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, std::numeric_limits<i32>::max()); +                    } +                    *currentOutput++ = value; +                }); +        }); +} + +void SerializeDatetimeColumn( +    const TTypedBatchColumn& typedColumn, +    TRecordBatchSerializationContext* context) +{ +    const auto* column = typedColumn.Column; +    const auto maxAllowedValue = std::numeric_limits<i64>::max() / 1000; +    YT_VERIFY(column->Values); + +    YT_LOG_DEBUG("Adding datetime column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", +        column->Id, +        column->StartIndex, +        column->ValueCount, +        column->Rle.has_value()); + +    SerializeColumnPrologue(typedColumn, context); + +    context->AddBuffer( +        column->ValueCount * sizeof(i64), +        [=] (TMutableRef dstRef) { +            const auto* valueColumn = column->Rle +                ? column->Rle->ValueColumn +                : column; +            auto values = valueColumn->GetTypedValues<ui64>(); + +            auto rleIndexes = column->Rle +                ? column->GetTypedValues<ui64>() +                : TRange<ui64>(); + +            auto startIndex = column->StartIndex; + +            auto dstValues = GetTypedValues<i64>(dstRef); +            auto* currentOutput = dstValues.Begin(); +            DecodeIntegerVector( +                startIndex, +                startIndex + column->ValueCount, +                valueColumn->Values->BaseValue, +                valueColumn->Values->ZigZagEncoded, +                TRange<ui32>(), +                rleIndexes, +                [&] (auto index) { +                    return values[index]; +                }, +                [&] (auto value) { +                    if(value > maxAllowedValue) { +                        THROW_ERROR_EXCEPTION("Datetime value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, maxAllowedValue); +                    } +                    *currentOutput++ = value * 1000; +                }); +        }); +} + +void SerializeTimestampColumn( +    const TTypedBatchColumn& typedColumn, +    TRecordBatchSerializationContext* context) +{ +    const auto* column = typedColumn.Column; +    YT_VERIFY(column->Values); + +    YT_LOG_DEBUG("Adding timestamp column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", +        column->Id, +        column->StartIndex, +        column->ValueCount, +        column->Rle.has_value()); + +    SerializeColumnPrologue(typedColumn, context); + +    context->AddBuffer( +        column->ValueCount * sizeof(i64), +        [=] (TMutableRef dstRef) { +            const auto* valueColumn = column->Rle +                ? column->Rle->ValueColumn +                : column; +            auto values = valueColumn->GetTypedValues<ui64>(); + +            auto rleIndexes = column->Rle +                ? column->GetTypedValues<ui64>() +                : TRange<ui64>(); + +            auto startIndex = column->StartIndex; + +            auto dstValues = GetTypedValues<i64>(dstRef); +            auto* currentOutput = dstValues.Begin(); +            DecodeIntegerVector( +                startIndex, +                startIndex + column->ValueCount, +                valueColumn->Values->BaseValue, +                valueColumn->Values->ZigZagEncoded, +                TRange<ui32>(), +                rleIndexes, +                [&] (auto index) { +                    return values[index]; +                }, +                [&] (auto value) { +                    if (value > std::numeric_limits<i64>::max()) { +                        THROW_ERROR_EXCEPTION("Timestamp value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, std::numeric_limits<i64>::max()); +                    } +                    *currentOutput++ = value; +                }); +        }); +} +  void SerializeDoubleColumn(      const TTypedBatchColumn& typedColumn,      TRecordBatchSerializationContext* context) @@ -471,6 +679,35 @@ void SerializeDoubleColumn(          });  } +void SerializeFloatColumn( +    const TTypedBatchColumn& typedColumn, +    TRecordBatchSerializationContext* context) +{ +    const auto* column = typedColumn.Column; +    YT_VERIFY(column->Values); +    YT_VERIFY(column->Values->BitWidth == 32); +    YT_VERIFY(column->Values->BaseValue == 0); +    YT_VERIFY(!column->Values->ZigZagEncoded); + +    YT_LOG_DEBUG("Adding float column (ColumnId: %v, StartIndex: %v, ValueCount: %v)", +        column->Id, +        column->StartIndex, +        column->ValueCount, +        column->Rle.has_value()); + +    SerializeColumnPrologue(typedColumn, context); + +    context->AddBuffer( +        column->ValueCount * sizeof(float), +        [=] (TMutableRef dstRef) { +            auto relevantValues = column->GetRelevantTypedValues<float>(); +            ::memcpy( +                dstRef.Begin(), +                relevantValues.Begin(), +                column->ValueCount * sizeof(float)); +        }); +} +  void SerializeStringLikeColumn(      const TTypedBatchColumn& typedColumn,      TRecordBatchSerializationContext* context) @@ -584,8 +821,18 @@ void SerializeColumn(      auto simpleType = CastToV1Type(typedColumn.Type).first;      if (IsIntegralType(simpleType)) {          SerializeIntegerColumn(typedColumn, simpleType, context); +    } else if (simpleType == ESimpleLogicalValueType::Interval) { +         SerializeIntegerColumn(typedColumn, simpleType, context); +    }  else if (simpleType == ESimpleLogicalValueType::Date) { +        SerializeDateColumn(typedColumn, context); +    } else if (simpleType == ESimpleLogicalValueType::Datetime) { +        SerializeDatetimeColumn(typedColumn, context); +    } else if (simpleType == ESimpleLogicalValueType::Timestamp) { +        SerializeTimestampColumn(typedColumn, context);      } else if (simpleType == ESimpleLogicalValueType::Double) {          SerializeDoubleColumn(typedColumn, context); +    } else if (simpleType == ESimpleLogicalValueType::Float) { +        SerializeFloatColumn(typedColumn, context);      } else if (IsStringLikeType(simpleType)) {          SerializeStringLikeColumn(typedColumn, context);      } else if (simpleType == ESimpleLogicalValueType::Boolean) {  | 
