diff options
author | robot-piglet <[email protected]> | 2025-08-13 15:07:18 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-08-13 15:17:54 +0300 |
commit | e4e1a3b6572088117555cc2b5b8034eb8dceeab4 (patch) | |
tree | 3341c9e18341fa4a599eab75e434bb26017109b1 | |
parent | 6fb889595016b3b8f4d4529fda53e44cdb682532 (diff) |
Intermediate changes
commit_hash:d32958cb4cafd844339c5c2de1f71b038b04e7c1
-rw-r--r-- | yt/yt/library/formats/arrow_parser.cpp | 131 | ||||
-rw-r--r-- | yt/yt/library/formats/arrow_writer.cpp | 12 |
2 files changed, 120 insertions, 23 deletions
diff --git a/yt/yt/library/formats/arrow_parser.cpp b/yt/yt/library/formats/arrow_parser.cpp index 1d8b848ab6d..ac4422e4803 100644 --- a/yt/yt/library/formats/arrow_parser.cpp +++ b/yt/yt/library/formats/arrow_parser.cpp @@ -46,7 +46,9 @@ namespace { static constexpr i64 SecondsToMicroCoefficient = 1'000'000; static constexpr i64 MilliToMicroCoefficient = 1'000; static constexpr i64 MicroToNanoCoefficient = 1'000; + static constexpr i64 SecondsToMilliCoefficient = 1'000; +static constexpr i64 SecondsToNanoCoefficient = 1'000'000'000; //////////////////////////////////////////////////////////////////////////////// @@ -297,6 +299,7 @@ void CheckArrowTypeMatch( arrow::Type::UINT32, arrow::Type::UINT64, arrow::Type::DATE64, + arrow::Type::TIMESTAMP, }, arrowTypeName, arrowTypeId); @@ -343,6 +346,7 @@ void CheckArrowTypeMatch( arrow::Type::INT32, arrow::Type::INT64, arrow::Type::DATE64, + arrow::Type::TIMESTAMP, }, arrowTypeName, arrowTypeId); @@ -524,7 +528,7 @@ i64 CheckAndTransformDate(i64 arrowValue, i64 minAllowedDate, i64 maxAllowedDate return arrowValue; } -i64 CheckAndTransformDatetime(i64 arrowValue, i64 minAllowedDate, i64 maxAllowedDate) +i64 CheckAndTransformDateToDatetime(i64 arrowValue, i64 minAllowedDate, i64 maxAllowedDate) { auto minarrowValue = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToMilliCoefficient); auto maxarrowValue = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToMilliCoefficient); @@ -539,6 +543,51 @@ i64 CheckAndTransformDatetime(i64 arrowValue, i64 minAllowedDate, i64 maxAllowed return arrowValue / SecondsToMilliCoefficient; } +i64 CheckAndTransformDatetime(i64 arrowValue, arrow::TimeUnit::type timeUnit, i64 minAllowedDate, i64 maxAllowedDate) +{ + i64 resultValue; + i64 minArrowAllowedDatetime; + i64 maxArrowAllowedDatetime; + + switch (timeUnit) { + case arrow::TimeUnit::type::SECOND: + resultValue = arrowValue; + minArrowAllowedDatetime = minAllowedDate; + maxArrowAllowedDatetime = maxAllowedDate; + break; + + case arrow::TimeUnit::type::MILLI: + resultValue = arrowValue / SecondsToMilliCoefficient; + minArrowAllowedDatetime = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToMilliCoefficient); + maxArrowAllowedDatetime = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToMilliCoefficient); + break; + + case arrow::TimeUnit::type::MICRO: + resultValue = arrowValue / SecondsToMicroCoefficient; + minArrowAllowedDatetime = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToMicroCoefficient); + maxArrowAllowedDatetime = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToMicroCoefficient); + break; + + case arrow::TimeUnit::type::NANO: + resultValue = arrowValue / SecondsToNanoCoefficient; + minArrowAllowedDatetime = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToNanoCoefficient); + maxArrowAllowedDatetime = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToNanoCoefficient); + break; + default: + THROW_ERROR_EXCEPTION("Unexpected arrow time unit %Qv", static_cast<int>(timeUnit)); + } + + if (resultValue < minArrowAllowedDatetime || resultValue > maxArrowAllowedDatetime) { + THROW_ERROR_EXCEPTION( + "Arrow timestamp value %v is incompatible with the YT datetime type, value should be in range [%v, %v]", + arrowValue, + minArrowAllowedDatetime, + maxArrowAllowedDatetime); + } + + return resultValue; +} + i64 CheckAndTransformTimestamp(i64 arrowValue, arrow::TimeUnit::type timeUnit, i64 minAllowedTimestamp, i64 maxAllowedTimestamp) { i64 resultValue; @@ -555,13 +604,13 @@ i64 CheckAndTransformTimestamp(i64 arrowValue, arrow::TimeUnit::type timeUnit, i case arrow::TimeUnit::type::SECOND: resultValue = SignedSaturationArithmeticMultiply(arrowValue, SecondsToMicroCoefficient); minArrowAllowedTimestamp = minAllowedTimestamp / SecondsToMicroCoefficient; - maxArrowAllowedTimestamp = maxAllowedTimestamp /SecondsToMicroCoefficient; + maxArrowAllowedTimestamp = maxAllowedTimestamp / SecondsToMicroCoefficient; break; case arrow::TimeUnit::type::MILLI: resultValue = SignedSaturationArithmeticMultiply(arrowValue, MilliToMicroCoefficient); minArrowAllowedTimestamp = minAllowedTimestamp / MilliToMicroCoefficient; - maxArrowAllowedTimestamp = maxAllowedTimestamp /MilliToMicroCoefficient; + maxArrowAllowedTimestamp = maxAllowedTimestamp / MilliToMicroCoefficient; break; case arrow::TimeUnit::type::MICRO: @@ -649,9 +698,9 @@ public: arrow::Status Visit(const arrow::Date64Type& /*type*/) override { if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime64) { - return ParseDate64<arrow::Date64Array>(); + return ParseDate64ToDatetime64<arrow::Date64Array>(); } else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime) { - return ParseDatetime<arrow::Date64Array>(); + return ParseDate64ToDatetime<arrow::Date64Array>(); } else { return ParseInt64<arrow::Date64Array>(); } @@ -663,6 +712,10 @@ public: return ParseTimestamp64<arrow::TimestampArray>(type.unit()); } else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Timestamp) { return ParseTimestamp<arrow::TimestampArray>(type.unit()); + } else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime) { + return ParseDatetime<arrow::TimestampArray>(type.unit()); + } else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime64) { + return ParseDatetime64<arrow::TimestampArray>(type.unit()); } else { return ParseInt64<arrow::TimestampArray>(); } @@ -796,11 +849,11 @@ private: } template <typename ArrayType> - arrow::Status ParseDatetime() + arrow::Status ParseDate64ToDatetime() { auto makeUnversionedValue = [] (i64 value, i64 columnId) { return MakeUnversionedUint64Value( - static_cast<ui64>(CheckAndTransformDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound)), + static_cast<ui64>(CheckAndTransformDateToDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound)), columnId); }; ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue); @@ -808,10 +861,34 @@ private: } template <typename ArrayType> - arrow::Status ParseDate64() + arrow::Status ParseDate64ToDatetime64() { auto makeUnversionedValue = [] (i64 value, i64 columnId) { - return MakeUnversionedInt64Value(CheckAndTransformDatetime(value, Datetime64LowerBound, DatetimeUpperBound), columnId); + return MakeUnversionedInt64Value(CheckAndTransformDateToDatetime(value, Datetime64LowerBound, DatetimeUpperBound), columnId); + }; + ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue); + return arrow::Status::OK(); + } + + template <typename ArrayType> + arrow::Status ParseDatetime(arrow::TimeUnit::type timeUnit) + { + auto makeUnversionedValue = [timeUnit] (i64 value, i64 columnId) { + return MakeUnversionedUint64Value( + static_cast<ui64>(CheckAndTransformDatetime(value, timeUnit, /*minAllowedDate*/ 0, DatetimeUpperBound)), + columnId); + }; + ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue); + return arrow::Status::OK(); + } + + template <typename ArrayType> + arrow::Status ParseDatetime64(arrow::TimeUnit::type timeUnit) + { + auto makeUnversionedValue = [timeUnit] (i64 value, i64 columnId) { + return MakeUnversionedInt64Value( + CheckAndTransformDatetime(value, timeUnit, Datetime64LowerBound, DatetimeUpperBound), + columnId); }; ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue); return arrow::Status::OK(); @@ -1083,9 +1160,9 @@ public: { CheckArrowTypeMatch(YTType_->AsSimpleTypeRef().GetElement(), type.type_name(), Array_->type()); if (YTType_->AsSimpleTypeRef().GetElement()== ESimpleLogicalValueType::Datetime64) { - return ParseDate64<arrow::Date64Array>(); + return ParseDate64ToDatetime64<arrow::Date64Array>(); } else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Datetime) { - return ParseDatetime<arrow::Date64Array>(); + return ParseDate64ToDatetime<arrow::Date64Array>(); } else { return ParseInt64<arrow::Date64Array>(); } @@ -1098,6 +1175,10 @@ public: return ParseTimestamp64<arrow::TimestampArray>(type.unit()); } else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Timestamp) { return ParseTimestamp<arrow::TimestampArray>(type.unit()); + } else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Datetime) { + return ParseDatetime<arrow::TimestampArray>(type.unit()); + } else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Datetime64) { + return ParseDatetime64<arrow::TimestampArray>(type.unit()); } else { return ParseInt64<arrow::TimestampArray>(); } @@ -1315,20 +1396,40 @@ private: } template <typename ArrayType> - arrow::Status ParseDatetime() + arrow::Status ParseDate64ToDatetime() { auto writeNumericValue = [] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) { - writer->WriteBinaryUint64(CheckAndTransformDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound)); + writer->WriteBinaryUint64(CheckAndTransformDateToDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound)); }; ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue); return arrow::Status::OK(); } template <typename ArrayType> - arrow::Status ParseDate64() + arrow::Status ParseDate64ToDatetime64() { auto writeNumericValue = [] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) { - writer->WriteBinaryInt64(CheckAndTransformDatetime(value, Datetime64LowerBound, Datetime64UpperBound)); + writer->WriteBinaryInt64(CheckAndTransformDateToDatetime(value, Datetime64LowerBound, Datetime64UpperBound)); + }; + ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue); + return arrow::Status::OK(); + } + + template <typename ArrayType> + arrow::Status ParseDatetime(arrow::TimeUnit::type timeUnit) + { + auto writeNumericValue = [timeUnit] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) { + writer->WriteBinaryUint64(CheckAndTransformDatetime(value, timeUnit, /*minAllowedDate*/ 0, DatetimeUpperBound)); + }; + ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue); + return arrow::Status::OK(); + } + + template <typename ArrayType> + arrow::Status ParseDatetime64(arrow::TimeUnit::type timeUnit) + { + auto writeNumericValue = [timeUnit] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) { + writer->WriteBinaryInt64(CheckAndTransformDatetime(value, timeUnit, Datetime64LowerBound, Datetime64UpperBound)); }; ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue); return arrow::Status::OK(); diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp index b186d1580b4..726a17c2f5c 100644 --- a/yt/yt/library/formats/arrow_writer.cpp +++ b/yt/yt/library/formats/arrow_writer.cpp @@ -173,10 +173,10 @@ std::tuple<flatbuf::Type, flatbuffers::Offset<void>, std::vector<flatbuffers::Of case ESimpleLogicalValueType::Datetime: return std::tuple( - flatbuf::Type::Date, - flatbuf::CreateDate( + flatbuf::Type::Timestamp, + flatbuf::CreateTimestamp( *flatbufBuilder, - flatbuf::DateUnit::MILLISECOND) + flatbuf::TimeUnit::SECOND) .Union(), std::vector<flatbuffers::Offset<flatbuf::Field>>()); @@ -703,7 +703,6 @@ void SerializeDatetimeColumn( TRecordBatchSerializationContext* context) { const auto* column = typedColumn.Column; - const auto maxAllowedValue = std::numeric_limits<i64>::max() / 1000; YT_VERIFY(column->Values); YT_LOG_DEBUG("Adding datetime column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)", @@ -747,10 +746,7 @@ void SerializeDatetimeColumn( return values[index]; }, [&] (auto value) { - if (value > maxAllowedValue) { - THROW_ERROR_EXCEPTION("Datetime value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, maxAllowedValue); - } - *currentOutput++ = value * 1000; + *currentOutput++ = value; }); }); } |