summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-08-13 15:07:18 +0300
committerrobot-piglet <[email protected]>2025-08-13 15:17:54 +0300
commite4e1a3b6572088117555cc2b5b8034eb8dceeab4 (patch)
tree3341c9e18341fa4a599eab75e434bb26017109b1
parent6fb889595016b3b8f4d4529fda53e44cdb682532 (diff)
Intermediate changes
commit_hash:d32958cb4cafd844339c5c2de1f71b038b04e7c1
-rw-r--r--yt/yt/library/formats/arrow_parser.cpp131
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp12
2 files changed, 120 insertions, 23 deletions
diff --git a/yt/yt/library/formats/arrow_parser.cpp b/yt/yt/library/formats/arrow_parser.cpp
index 1d8b848ab6d..ac4422e4803 100644
--- a/yt/yt/library/formats/arrow_parser.cpp
+++ b/yt/yt/library/formats/arrow_parser.cpp
@@ -46,7 +46,9 @@ namespace {
static constexpr i64 SecondsToMicroCoefficient = 1'000'000;
static constexpr i64 MilliToMicroCoefficient = 1'000;
static constexpr i64 MicroToNanoCoefficient = 1'000;
+
static constexpr i64 SecondsToMilliCoefficient = 1'000;
+static constexpr i64 SecondsToNanoCoefficient = 1'000'000'000;
////////////////////////////////////////////////////////////////////////////////
@@ -297,6 +299,7 @@ void CheckArrowTypeMatch(
arrow::Type::UINT32,
arrow::Type::UINT64,
arrow::Type::DATE64,
+ arrow::Type::TIMESTAMP,
},
arrowTypeName,
arrowTypeId);
@@ -343,6 +346,7 @@ void CheckArrowTypeMatch(
arrow::Type::INT32,
arrow::Type::INT64,
arrow::Type::DATE64,
+ arrow::Type::TIMESTAMP,
},
arrowTypeName,
arrowTypeId);
@@ -524,7 +528,7 @@ i64 CheckAndTransformDate(i64 arrowValue, i64 minAllowedDate, i64 maxAllowedDate
return arrowValue;
}
-i64 CheckAndTransformDatetime(i64 arrowValue, i64 minAllowedDate, i64 maxAllowedDate)
+i64 CheckAndTransformDateToDatetime(i64 arrowValue, i64 minAllowedDate, i64 maxAllowedDate)
{
auto minarrowValue = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToMilliCoefficient);
auto maxarrowValue = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToMilliCoefficient);
@@ -539,6 +543,51 @@ i64 CheckAndTransformDatetime(i64 arrowValue, i64 minAllowedDate, i64 maxAllowed
return arrowValue / SecondsToMilliCoefficient;
}
+i64 CheckAndTransformDatetime(i64 arrowValue, arrow::TimeUnit::type timeUnit, i64 minAllowedDate, i64 maxAllowedDate)
+{
+ i64 resultValue;
+ i64 minArrowAllowedDatetime;
+ i64 maxArrowAllowedDatetime;
+
+ switch (timeUnit) {
+ case arrow::TimeUnit::type::SECOND:
+ resultValue = arrowValue;
+ minArrowAllowedDatetime = minAllowedDate;
+ maxArrowAllowedDatetime = maxAllowedDate;
+ break;
+
+ case arrow::TimeUnit::type::MILLI:
+ resultValue = arrowValue / SecondsToMilliCoefficient;
+ minArrowAllowedDatetime = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToMilliCoefficient);
+ maxArrowAllowedDatetime = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToMilliCoefficient);
+ break;
+
+ case arrow::TimeUnit::type::MICRO:
+ resultValue = arrowValue / SecondsToMicroCoefficient;
+ minArrowAllowedDatetime = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToMicroCoefficient);
+ maxArrowAllowedDatetime = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToMicroCoefficient);
+ break;
+
+ case arrow::TimeUnit::type::NANO:
+ resultValue = arrowValue / SecondsToNanoCoefficient;
+ minArrowAllowedDatetime = SignedSaturationArithmeticMultiply(minAllowedDate, SecondsToNanoCoefficient);
+ maxArrowAllowedDatetime = SignedSaturationArithmeticMultiply(maxAllowedDate, SecondsToNanoCoefficient);
+ break;
+ default:
+ THROW_ERROR_EXCEPTION("Unexpected arrow time unit %Qv", static_cast<int>(timeUnit));
+ }
+
+ if (resultValue < minArrowAllowedDatetime || resultValue > maxArrowAllowedDatetime) {
+ THROW_ERROR_EXCEPTION(
+ "Arrow timestamp value %v is incompatible with the YT datetime type, value should be in range [%v, %v]",
+ arrowValue,
+ minArrowAllowedDatetime,
+ maxArrowAllowedDatetime);
+ }
+
+ return resultValue;
+}
+
i64 CheckAndTransformTimestamp(i64 arrowValue, arrow::TimeUnit::type timeUnit, i64 minAllowedTimestamp, i64 maxAllowedTimestamp)
{
i64 resultValue;
@@ -555,13 +604,13 @@ i64 CheckAndTransformTimestamp(i64 arrowValue, arrow::TimeUnit::type timeUnit, i
case arrow::TimeUnit::type::SECOND:
resultValue = SignedSaturationArithmeticMultiply(arrowValue, SecondsToMicroCoefficient);
minArrowAllowedTimestamp = minAllowedTimestamp / SecondsToMicroCoefficient;
- maxArrowAllowedTimestamp = maxAllowedTimestamp /SecondsToMicroCoefficient;
+ maxArrowAllowedTimestamp = maxAllowedTimestamp / SecondsToMicroCoefficient;
break;
case arrow::TimeUnit::type::MILLI:
resultValue = SignedSaturationArithmeticMultiply(arrowValue, MilliToMicroCoefficient);
minArrowAllowedTimestamp = minAllowedTimestamp / MilliToMicroCoefficient;
- maxArrowAllowedTimestamp = maxAllowedTimestamp /MilliToMicroCoefficient;
+ maxArrowAllowedTimestamp = maxAllowedTimestamp / MilliToMicroCoefficient;
break;
case arrow::TimeUnit::type::MICRO:
@@ -649,9 +698,9 @@ public:
arrow::Status Visit(const arrow::Date64Type& /*type*/) override
{
if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime64) {
- return ParseDate64<arrow::Date64Array>();
+ return ParseDate64ToDatetime64<arrow::Date64Array>();
} else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime) {
- return ParseDatetime<arrow::Date64Array>();
+ return ParseDate64ToDatetime<arrow::Date64Array>();
} else {
return ParseInt64<arrow::Date64Array>();
}
@@ -663,6 +712,10 @@ public:
return ParseTimestamp64<arrow::TimestampArray>(type.unit());
} else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Timestamp) {
return ParseTimestamp<arrow::TimestampArray>(type.unit());
+ } else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime) {
+ return ParseDatetime<arrow::TimestampArray>(type.unit());
+ } else if (ColumnType_ && *ColumnType_ == ESimpleLogicalValueType::Datetime64) {
+ return ParseDatetime64<arrow::TimestampArray>(type.unit());
} else {
return ParseInt64<arrow::TimestampArray>();
}
@@ -796,11 +849,11 @@ private:
}
template <typename ArrayType>
- arrow::Status ParseDatetime()
+ arrow::Status ParseDate64ToDatetime()
{
auto makeUnversionedValue = [] (i64 value, i64 columnId) {
return MakeUnversionedUint64Value(
- static_cast<ui64>(CheckAndTransformDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound)),
+ static_cast<ui64>(CheckAndTransformDateToDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound)),
columnId);
};
ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue);
@@ -808,10 +861,34 @@ private:
}
template <typename ArrayType>
- arrow::Status ParseDate64()
+ arrow::Status ParseDate64ToDatetime64()
{
auto makeUnversionedValue = [] (i64 value, i64 columnId) {
- return MakeUnversionedInt64Value(CheckAndTransformDatetime(value, Datetime64LowerBound, DatetimeUpperBound), columnId);
+ return MakeUnversionedInt64Value(CheckAndTransformDateToDatetime(value, Datetime64LowerBound, DatetimeUpperBound), columnId);
+ };
+ ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue);
+ return arrow::Status::OK();
+ }
+
+ template <typename ArrayType>
+ arrow::Status ParseDatetime(arrow::TimeUnit::type timeUnit)
+ {
+ auto makeUnversionedValue = [timeUnit] (i64 value, i64 columnId) {
+ return MakeUnversionedUint64Value(
+ static_cast<ui64>(CheckAndTransformDatetime(value, timeUnit, /*minAllowedDate*/ 0, DatetimeUpperBound)),
+ columnId);
+ };
+ ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue);
+ return arrow::Status::OK();
+ }
+
+ template <typename ArrayType>
+ arrow::Status ParseDatetime64(arrow::TimeUnit::type timeUnit)
+ {
+ auto makeUnversionedValue = [timeUnit] (i64 value, i64 columnId) {
+ return MakeUnversionedInt64Value(
+ CheckAndTransformDatetime(value, timeUnit, Datetime64LowerBound, DatetimeUpperBound),
+ columnId);
};
ParseSimpleNumeric<ArrayType, decltype(makeUnversionedValue)>(makeUnversionedValue);
return arrow::Status::OK();
@@ -1083,9 +1160,9 @@ public:
{
CheckArrowTypeMatch(YTType_->AsSimpleTypeRef().GetElement(), type.type_name(), Array_->type());
if (YTType_->AsSimpleTypeRef().GetElement()== ESimpleLogicalValueType::Datetime64) {
- return ParseDate64<arrow::Date64Array>();
+ return ParseDate64ToDatetime64<arrow::Date64Array>();
} else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Datetime) {
- return ParseDatetime<arrow::Date64Array>();
+ return ParseDate64ToDatetime<arrow::Date64Array>();
} else {
return ParseInt64<arrow::Date64Array>();
}
@@ -1098,6 +1175,10 @@ public:
return ParseTimestamp64<arrow::TimestampArray>(type.unit());
} else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Timestamp) {
return ParseTimestamp<arrow::TimestampArray>(type.unit());
+ } else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Datetime) {
+ return ParseDatetime<arrow::TimestampArray>(type.unit());
+ } else if (YTType_->AsSimpleTypeRef().GetElement() == ESimpleLogicalValueType::Datetime64) {
+ return ParseDatetime64<arrow::TimestampArray>(type.unit());
} else {
return ParseInt64<arrow::TimestampArray>();
}
@@ -1315,20 +1396,40 @@ private:
}
template <typename ArrayType>
- arrow::Status ParseDatetime()
+ arrow::Status ParseDate64ToDatetime()
{
auto writeNumericValue = [] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) {
- writer->WriteBinaryUint64(CheckAndTransformDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound));
+ writer->WriteBinaryUint64(CheckAndTransformDateToDatetime(value, /*minAllowedDate*/ 0, DatetimeUpperBound));
};
ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue);
return arrow::Status::OK();
}
template <typename ArrayType>
- arrow::Status ParseDate64()
+ arrow::Status ParseDate64ToDatetime64()
{
auto writeNumericValue = [] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) {
- writer->WriteBinaryInt64(CheckAndTransformDatetime(value, Datetime64LowerBound, Datetime64UpperBound));
+ writer->WriteBinaryInt64(CheckAndTransformDateToDatetime(value, Datetime64LowerBound, Datetime64UpperBound));
+ };
+ ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue);
+ return arrow::Status::OK();
+ }
+
+ template <typename ArrayType>
+ arrow::Status ParseDatetime(arrow::TimeUnit::type timeUnit)
+ {
+ auto writeNumericValue = [timeUnit] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) {
+ writer->WriteBinaryUint64(CheckAndTransformDatetime(value, timeUnit, /*minAllowedDate*/ 0, DatetimeUpperBound));
+ };
+ ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue);
+ return arrow::Status::OK();
+ }
+
+ template <typename ArrayType>
+ arrow::Status ParseDatetime64(arrow::TimeUnit::type timeUnit)
+ {
+ auto writeNumericValue = [timeUnit] (NYson::TCheckedInDebugYsonTokenWriter* writer, i64 value) {
+ writer->WriteBinaryInt64(CheckAndTransformDatetime(value, timeUnit, Datetime64LowerBound, Datetime64UpperBound));
};
ParseComplexNumeric<ArrayType, decltype(writeNumericValue)>(writeNumericValue);
return arrow::Status::OK();
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index b186d1580b4..726a17c2f5c 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -173,10 +173,10 @@ std::tuple<flatbuf::Type, flatbuffers::Offset<void>, std::vector<flatbuffers::Of
case ESimpleLogicalValueType::Datetime:
return std::tuple(
- flatbuf::Type::Date,
- flatbuf::CreateDate(
+ flatbuf::Type::Timestamp,
+ flatbuf::CreateTimestamp(
*flatbufBuilder,
- flatbuf::DateUnit::MILLISECOND)
+ flatbuf::TimeUnit::SECOND)
.Union(),
std::vector<flatbuffers::Offset<flatbuf::Field>>());
@@ -703,7 +703,6 @@ void SerializeDatetimeColumn(
TRecordBatchSerializationContext* context)
{
const auto* column = typedColumn.Column;
- const auto maxAllowedValue = std::numeric_limits<i64>::max() / 1000;
YT_VERIFY(column->Values);
YT_LOG_DEBUG("Adding datetime column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)",
@@ -747,10 +746,7 @@ void SerializeDatetimeColumn(
return values[index];
},
[&] (auto value) {
- if (value > maxAllowedValue) {
- THROW_ERROR_EXCEPTION("Datetime value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, maxAllowedValue);
- }
- *currentOutput++ = value * 1000;
+ *currentOutput++ = value;
});
});
}