diff options
author | uzhas <uzhas@ydb.tech> | 2023-02-28 12:45:55 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2023-02-28 12:45:55 +0300 |
commit | bcf551ba5681d91971aaadb88b49316980e568c2 (patch) | |
tree | 5c681e37f2c058390add9c89b9de953c171bf4c3 | |
parent | 38ec3bde944ec1d54c28a316a1b188106145fd27 (diff) | |
download | ydb-bcf551ba5681d91971aaadb88b49316980e568c2.tar.gz |
support parquet date reading
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 91 | ||||
-rw-r--r-- | ydb/library/yql/public/udf/arrow/block_builder.h | 15 |
2 files changed, 87 insertions, 19 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 4976a9c1e64..af9ebb83bea 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -47,6 +47,8 @@ #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> +#include <ydb/library/yql/public/udf/arrow/block_builder.h> +#include <ydb/library/yql/public/udf/arrow/block_reader.h> #include <ydb/library/yql/public/udf/arrow/util.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -974,6 +976,7 @@ struct TReadSpec { using TPtr = std::shared_ptr<TReadSpec>; bool Arrow = false; + std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec; NDB::ColumnsWithTypeAndName CHColumns; std::shared_ptr<arrow::Schema> ArrowSchema; NDB::FormatSettings Settings; @@ -1105,8 +1108,8 @@ public: if (!CurrentBatchReader) { auto future = ArrowReader->ReadRowGroup(FileDesc, CurrentGroup++, ColumnIndices); - future.Subscribe([this](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ - OnFutureResolve(); + future.Subscribe([f = OnFutureResolve](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ + f(); }); WaitForFutureResolve(); CurrentTable = future.GetValue(); @@ -1145,6 +1148,64 @@ private: std::function<void()> WaitForFutureResolve; }; +template <bool isOptional> +std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) { + ::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader; + for (i64 i = 0; i < value->length(); ++i) { + const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw yexception() << "null value for date could not be represented in non-optional type"; + } + + const i32 v = item.As<i32>(); + if (v < 0 || v > ::NYql::NUdf::MAX_DATE) { + throw yexception() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v; + } + builder.Add(NUdf::TBlockItem(static_cast<ui16>(v))); + } + return builder.Build(true).make_array(); +} + +TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) { + if (originalType->id() == arrow::Type::DATE32) { + // TODO: support more than 1 optional level + bool isOptional = false; + auto unpackedYqlType = UnpackOptional(yqlType, isOptional); + + // arrow date -> yql date + if (unpackedYqlType->IsData()) { + auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot(); + if (slot == NUdf::EDataSlot::Date) { + return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) { + if (isOptional) { + return ArrowDate32AsYqlDate<true>(targetType, value); + } + return ArrowDate32AsYqlDate<false>(targetType, value); + }; + } + } + } + + if (targetType->Equals(originalType)) { + return {}; + } + + YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: " + << targetType->ToString() << ", got: " << originalType->ToString()); + + return [targetType](const std::shared_ptr<arrow::Array>& value) { + auto res = arrow::compute::Cast(*value, targetType); + THROW_ARROW_NOT_OK(res.status()); + return std::move(res).ValueOrDie(); + }; +} + class TS3ReadCoroImpl : public TActorCoroImpl { friend class TS3StreamReadActor; private: @@ -1417,6 +1478,7 @@ private: std::shared_ptr<arrow::Schema> schema = result.Schema; std::vector<int> columnIndices; std::vector<TColumnConverter> columnConverters; + columnConverters.reserve(ReadSpec->ArrowSchema->num_fields()); for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) { const auto& targetField = ReadSpec->ArrowSchema->field(i); auto srcFieldIndex = schema->GetFieldIndex(targetField->name()); @@ -1425,19 +1487,10 @@ private: auto originalType = schema->field(srcFieldIndex)->type(); YQL_ENSURE(!originalType->layout().has_dictionary, "Unsupported dictionary encoding is used for field: " << targetField->name() << ", type: " << originalType->ToString()); - if (targetType->Equals(originalType)) { - columnConverters.emplace_back(); - } else { - YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << targetField->name() << ", expected: " - << targetType->ToString() << ", got: " << originalType->ToString()); - columnConverters.emplace_back([targetType](const std::shared_ptr<arrow::Array>& value) { - auto res = arrow::compute::Cast(*value, targetType); - THROW_ARROW_NOT_OK(res.status()); - return std::move(res).ValueOrDie(); - }); - } - columnIndices.push_back(srcFieldIndex); + auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name()); + YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec"); + columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second)); } fileDesc.Cookie = result.Cookie; @@ -2353,17 +2406,20 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( if (params.HasFormat() && params.HasRowType()) { const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry); const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr); + YQL_ENSURE(outputItemType->IsStruct(), "Row type is not struct"); const auto structType = static_cast<TStructType*>(outputItemType); const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Arrow = params.GetArrow(); if (readSpec->Arrow) { arrow::SchemaBuilder builder; - auto extraStructType = static_cast<TStructType*>(pb->NewStructType(structType, "_yql_block_length", + const TStringBuf blockLengthColumn("_yql_block_length"sv); + auto extraStructType = static_cast<TStructType*>(pb->NewStructType(structType, blockLengthColumn, pb->NewBlockType(pb->NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Scalar))); for (ui32 i = 0U; i < extraStructType->GetMembersCount(); ++i) { - if (extraStructType->GetMemberName(i) == "_yql_block_length") { + TStringBuf memberName = extraStructType->GetMemberName(i); + if (memberName == blockLengthColumn) { readSpec->BlockLengthPosition = i; continue; } @@ -2372,8 +2428,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( std::shared_ptr<arrow::DataType> dataType; YQL_ENSURE(ConvertArrowType(memberType, dataType), "Unsupported arrow type"); - THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(extraStructType->GetMemberName(i)), dataType, memberType->IsOptional()))); + THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional()))); readSpec->ColumnReorder.push_back(i); + readSpec->RowSpec.emplace(memberName, memberType); } auto res = builder.Finish(); diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index 8113a7ee10a..95dcd61f160 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -74,8 +74,8 @@ public: std::vector<TBlockArrayTree::Ptr> Children; }; - TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) - : ArrowType(GetArrowType(typeInfoHelper, type)) + TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen) + : ArrowType(std::move(arrowType)) , Pool(&pool) , MaxLen(maxLen) , MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes()) @@ -84,6 +84,11 @@ public: Y_VERIFY(maxLen > 0); } + TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) + : TArrayBuilderBase(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen) + { + } + size_t MaxLength() const final { return MaxLen; } @@ -292,6 +297,12 @@ private: template <typename T, bool Nullable> class TFixedSizeArrayBuilder : public TArrayBuilderBase { public: + TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen) + : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen) + { + Reserve(); + } + TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen) { |