aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-11-23 18:08:39 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-11-23 18:30:22 +0300
commit50c69e8394f3c3e1afef75878953a4c4691d4383 (patch)
tree420a48ebfc10cd90e352785bf39f125e686ab08b /yt
parent5aecdb67595db1a47c933b7d8da2cb662a50e185 (diff)
downloadydb-50c69e8394f3c3e1afef75878953a4c4691d4383.tar.gz
Intermediate changes
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/library/formats/arrow_writer.cpp261
1 files changed, 254 insertions, 7 deletions
diff --git a/yt/yt/library/formats/arrow_writer.cpp b/yt/yt/library/formats/arrow_writer.cpp
index e0aabc5b79..b4b8853571 100644
--- a/yt/yt/library/formats/arrow_writer.cpp
+++ b/yt/yt/library/formats/arrow_writer.cpp
@@ -82,6 +82,39 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
IsIntegralTypeSigned(simpleType))
.Union());
+ case ESimpleLogicalValueType::Interval:
+ return std::make_tuple(
+ org::apache::arrow::flatbuf::Type_Int,
+ org::apache::arrow::flatbuf::CreateInt(
+ *flatbufBuilder,
+ 64,
+ true)
+ .Union());
+
+ case ESimpleLogicalValueType::Date:
+ return std::make_tuple(
+ org::apache::arrow::flatbuf::Type_Date,
+ org::apache::arrow::flatbuf::CreateDate(
+ *flatbufBuilder,
+ org::apache::arrow::flatbuf::DateUnit_DAY)
+ .Union());
+
+ case ESimpleLogicalValueType::Datetime:
+ return std::make_tuple(
+ org::apache::arrow::flatbuf::Type_Date,
+ org::apache::arrow::flatbuf::CreateDate(
+ *flatbufBuilder,
+ org::apache::arrow::flatbuf::DateUnit_MILLISECOND)
+ .Union());
+
+ case ESimpleLogicalValueType::Timestamp:
+ return std::make_tuple(
+ org::apache::arrow::flatbuf::Type_Timestamp,
+ org::apache::arrow::flatbuf::CreateTimestamp(
+ *flatbufBuilder,
+ org::apache::arrow::flatbuf::TimeUnit_MICROSECOND)
+ .Union());
+
case ESimpleLogicalValueType::Double:
return std::make_tuple(
org::apache::arrow::flatbuf::Type_FloatingPoint,
@@ -90,6 +123,14 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
org::apache::arrow::flatbuf::Precision_DOUBLE)
.Union());
+ case ESimpleLogicalValueType::Float:
+ return std::make_tuple(
+ org::apache::arrow::flatbuf::Type_FloatingPoint,
+ org::apache::arrow::flatbuf::CreateFloatingPoint(
+ *flatbufBuilder,
+ org::apache::arrow::flatbuf::Precision_SINGLE)
+ .Union());
+
case ESimpleLogicalValueType::Boolean:
return std::make_tuple(
org::apache::arrow::flatbuf::Type_Bool,
@@ -109,12 +150,6 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
org::apache::arrow::flatbuf::CreateUtf8(*flatbufBuilder)
.Union());
- // TODO(babenko): the following types are not supported:
- // Date
- // Datetime
- // Interval
- // Timestamp
-
default:
THROW_ERROR_EXCEPTION("Column %v has type %Qlv that is not currently supported by Arrow encoder",
schema.GetDiagnosticNameString(),
@@ -122,6 +157,27 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
}
}
+int GetIntegralLikeTypeByteSize(ESimpleLogicalValueType type)
+{
+ switch (type) {
+ case ESimpleLogicalValueType::Int8:
+ case ESimpleLogicalValueType::Uint8:
+ return 1;
+ case ESimpleLogicalValueType::Int16:
+ case ESimpleLogicalValueType::Uint16:
+ return 2;
+ case ESimpleLogicalValueType::Int32:
+ case ESimpleLogicalValueType::Uint32:
+ return 4;
+ case ESimpleLogicalValueType::Int64:
+ case ESimpleLogicalValueType::Uint64:
+ case ESimpleLogicalValueType::Interval:
+ return 8;
+ default:
+ YT_ABORT();
+ }
+}
+
bool IsRleButNotDictionaryEncodedStringLikeColumn(const TBatchColumn& column)
{
auto simpleType = CastToV1Type(column.Type).first;
@@ -389,7 +445,7 @@ void SerializeIntegerColumn(
SerializeColumnPrologue(typedColumn, context);
context->AddBuffer(
- column->ValueCount * GetIntegralTypeByteSize(simpleType),
+ column->ValueCount * GetIntegralLikeTypeByteSize(simpleType),
[=] (TMutableRef dstRef) {
const auto* valueColumn = column->Rle
? column->Rle->ValueColumn
@@ -431,6 +487,7 @@ void SerializeIntegerColumn(
XX(ui16, Uint16)
XX(ui32, Uint32)
XX(ui64, Uint64)
+ XX(i64, Interval)
#undef XX
@@ -442,6 +499,157 @@ void SerializeIntegerColumn(
});
}
+void SerializeDateColumn(
+ const TTypedBatchColumn& typedColumn,
+ TRecordBatchSerializationContext* context)
+{
+ const auto* column = typedColumn.Column;
+ YT_VERIFY(column->Values);
+
+ YT_LOG_DEBUG("Adding data column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)",
+ column->Id,
+ column->StartIndex,
+ column->ValueCount,
+ column->Rle.has_value());
+
+ SerializeColumnPrologue(typedColumn, context);
+
+ context->AddBuffer(
+ column->ValueCount * sizeof(i32),
+ [=] (TMutableRef dstRef) {
+ const auto* valueColumn = column->Rle
+ ? column->Rle->ValueColumn
+ : column;
+ auto values = valueColumn->GetTypedValues<ui64>();
+
+ auto rleIndexes = column->Rle
+ ? column->GetTypedValues<ui64>()
+ : TRange<ui64>();
+
+ auto startIndex = column->StartIndex;
+
+ auto dstValues = GetTypedValues<i32>(dstRef);
+ auto* currentOutput = dstValues.Begin();
+ DecodeIntegerVector(
+ startIndex,
+ startIndex + column->ValueCount,
+ valueColumn->Values->BaseValue,
+ valueColumn->Values->ZigZagEncoded,
+ TRange<ui32>(),
+ rleIndexes,
+ [&] (auto index) {
+ return values[index];
+ },
+ [&] (auto value) {
+ if (value > std::numeric_limits<i32>::max()) {
+ THROW_ERROR_EXCEPTION("Date value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, std::numeric_limits<i32>::max());
+ }
+ *currentOutput++ = value;
+ });
+ });
+}
+
+void SerializeDatetimeColumn(
+ const TTypedBatchColumn& typedColumn,
+ TRecordBatchSerializationContext* context)
+{
+ const auto* column = typedColumn.Column;
+ const auto maxAllowedValue = std::numeric_limits<i64>::max() / 1000;
+ YT_VERIFY(column->Values);
+
+ YT_LOG_DEBUG("Adding datetime column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)",
+ column->Id,
+ column->StartIndex,
+ column->ValueCount,
+ column->Rle.has_value());
+
+ SerializeColumnPrologue(typedColumn, context);
+
+ context->AddBuffer(
+ column->ValueCount * sizeof(i64),
+ [=] (TMutableRef dstRef) {
+ const auto* valueColumn = column->Rle
+ ? column->Rle->ValueColumn
+ : column;
+ auto values = valueColumn->GetTypedValues<ui64>();
+
+ auto rleIndexes = column->Rle
+ ? column->GetTypedValues<ui64>()
+ : TRange<ui64>();
+
+ auto startIndex = column->StartIndex;
+
+ auto dstValues = GetTypedValues<i64>(dstRef);
+ auto* currentOutput = dstValues.Begin();
+ DecodeIntegerVector(
+ startIndex,
+ startIndex + column->ValueCount,
+ valueColumn->Values->BaseValue,
+ valueColumn->Values->ZigZagEncoded,
+ TRange<ui32>(),
+ rleIndexes,
+ [&] (auto index) {
+ return values[index];
+ },
+ [&] (auto value) {
+ if(value > maxAllowedValue) {
+ THROW_ERROR_EXCEPTION("Datetime value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, maxAllowedValue);
+ }
+ *currentOutput++ = value * 1000;
+ });
+ });
+}
+
+void SerializeTimestampColumn(
+ const TTypedBatchColumn& typedColumn,
+ TRecordBatchSerializationContext* context)
+{
+ const auto* column = typedColumn.Column;
+ YT_VERIFY(column->Values);
+
+ YT_LOG_DEBUG("Adding timestamp column (ColumnId: %v, StartIndex: %v, ValueCount: %v, Rle: %v)",
+ column->Id,
+ column->StartIndex,
+ column->ValueCount,
+ column->Rle.has_value());
+
+ SerializeColumnPrologue(typedColumn, context);
+
+ context->AddBuffer(
+ column->ValueCount * sizeof(i64),
+ [=] (TMutableRef dstRef) {
+ const auto* valueColumn = column->Rle
+ ? column->Rle->ValueColumn
+ : column;
+ auto values = valueColumn->GetTypedValues<ui64>();
+
+ auto rleIndexes = column->Rle
+ ? column->GetTypedValues<ui64>()
+ : TRange<ui64>();
+
+ auto startIndex = column->StartIndex;
+
+ auto dstValues = GetTypedValues<i64>(dstRef);
+ auto* currentOutput = dstValues.Begin();
+ DecodeIntegerVector(
+ startIndex,
+ startIndex + column->ValueCount,
+ valueColumn->Values->BaseValue,
+ valueColumn->Values->ZigZagEncoded,
+ TRange<ui32>(),
+ rleIndexes,
+ [&] (auto index) {
+ return values[index];
+ },
+ [&] (auto value) {
+ if (value > std::numeric_limits<i64>::max()) {
+ THROW_ERROR_EXCEPTION("Timestamp value cannot be represented in arrow (Value: %v, MaxAllowedValue: %v)", value, std::numeric_limits<i64>::max());
+ }
+ *currentOutput++ = value;
+ });
+ });
+}
+
void SerializeDoubleColumn(
const TTypedBatchColumn& typedColumn,
TRecordBatchSerializationContext* context)
@@ -471,6 +679,35 @@ void SerializeDoubleColumn(
});
}
+void SerializeFloatColumn(
+ const TTypedBatchColumn& typedColumn,
+ TRecordBatchSerializationContext* context)
+{
+ const auto* column = typedColumn.Column;
+ YT_VERIFY(column->Values);
+ YT_VERIFY(column->Values->BitWidth == 32);
+ YT_VERIFY(column->Values->BaseValue == 0);
+ YT_VERIFY(!column->Values->ZigZagEncoded);
+
+ YT_LOG_DEBUG("Adding float column (ColumnId: %v, StartIndex: %v, ValueCount: %v)",
+ column->Id,
+ column->StartIndex,
+ column->ValueCount,
+ column->Rle.has_value());
+
+ SerializeColumnPrologue(typedColumn, context);
+
+ context->AddBuffer(
+ column->ValueCount * sizeof(float),
+ [=] (TMutableRef dstRef) {
+ auto relevantValues = column->GetRelevantTypedValues<float>();
+ ::memcpy(
+ dstRef.Begin(),
+ relevantValues.Begin(),
+ column->ValueCount * sizeof(float));
+ });
+}
+
void SerializeStringLikeColumn(
const TTypedBatchColumn& typedColumn,
TRecordBatchSerializationContext* context)
@@ -584,8 +821,18 @@ void SerializeColumn(
auto simpleType = CastToV1Type(typedColumn.Type).first;
if (IsIntegralType(simpleType)) {
SerializeIntegerColumn(typedColumn, simpleType, context);
+ } else if (simpleType == ESimpleLogicalValueType::Interval) {
+ SerializeIntegerColumn(typedColumn, simpleType, context);
+ } else if (simpleType == ESimpleLogicalValueType::Date) {
+ SerializeDateColumn(typedColumn, context);
+ } else if (simpleType == ESimpleLogicalValueType::Datetime) {
+ SerializeDatetimeColumn(typedColumn, context);
+ } else if (simpleType == ESimpleLogicalValueType::Timestamp) {
+ SerializeTimestampColumn(typedColumn, context);
} else if (simpleType == ESimpleLogicalValueType::Double) {
SerializeDoubleColumn(typedColumn, context);
+ } else if (simpleType == ESimpleLogicalValueType::Float) {
+ SerializeFloatColumn(typedColumn, context);
} else if (IsStringLikeType(simpleType)) {
SerializeStringLikeColumn(typedColumn, context);
} else if (simpleType == ESimpleLogicalValueType::Boolean) {