aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2023-02-28 12:45:55 +0300
committeruzhas <uzhas@ydb.tech>2023-02-28 12:45:55 +0300
commitbcf551ba5681d91971aaadb88b49316980e568c2 (patch)
tree5c681e37f2c058390add9c89b9de953c171bf4c3
parent38ec3bde944ec1d54c28a316a1b188106145fd27 (diff)
downloadydb-bcf551ba5681d91971aaadb88b49316980e568c2.tar.gz
support parquet date reading
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp91
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h15
2 files changed, 87 insertions, 19 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 4976a9c1e64..af9ebb83bea 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -47,6 +47,8 @@
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+#include <ydb/library/yql/public/udf/arrow/block_builder.h>
+#include <ydb/library/yql/public/udf/arrow/block_reader.h>
#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -974,6 +976,7 @@ struct TReadSpec {
using TPtr = std::shared_ptr<TReadSpec>;
bool Arrow = false;
+ std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec;
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
NDB::FormatSettings Settings;
@@ -1105,8 +1108,8 @@ public:
if (!CurrentBatchReader) {
auto future = ArrowReader->ReadRowGroup(FileDesc, CurrentGroup++, ColumnIndices);
- future.Subscribe([this](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
- OnFutureResolve();
+ future.Subscribe([f = OnFutureResolve](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
+ f();
});
WaitForFutureResolve();
CurrentTable = future.GetValue();
@@ -1145,6 +1148,64 @@ private:
std::function<void()> WaitForFutureResolve;
};
+template <bool isOptional>
+std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
+ ::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
+ ::NYql::NUdf::TFixedSizeBlockReader<i32, isOptional> reader;
+ for (i64 i = 0; i < value->length(); ++i) {
+ const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
+ if constexpr (isOptional) {
+ if (!item) {
+ builder.Add(item);
+ continue;
+ }
+ } else if (!item) {
+ throw yexception() << "null value for date could not be represented in non-optional type";
+ }
+
+ const i32 v = item.As<i32>();
+ if (v < 0 || v > ::NYql::NUdf::MAX_DATE) {
+ throw yexception() << "date in parquet is out of range [0, " << ::NYql::NUdf::MAX_DATE << "]: " << v;
+ }
+ builder.Add(NUdf::TBlockItem(static_cast<ui16>(v)));
+ }
+ return builder.Build(true).make_array();
+}
+
+TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) {
+ if (originalType->id() == arrow::Type::DATE32) {
+ // TODO: support more than 1 optional level
+ bool isOptional = false;
+ auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
+
+ // arrow date -> yql date
+ if (unpackedYqlType->IsData()) {
+ auto slot = AS_TYPE(TDataType, unpackedYqlType)->GetDataSlot();
+ if (slot == NUdf::EDataSlot::Date) {
+ return [targetType, isOptional](const std::shared_ptr<arrow::Array>& value) {
+ if (isOptional) {
+ return ArrowDate32AsYqlDate<true>(targetType, value);
+ }
+ return ArrowDate32AsYqlDate<false>(targetType, value);
+ };
+ }
+ }
+ }
+
+ if (targetType->Equals(originalType)) {
+ return {};
+ }
+
+ YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: "
+ << targetType->ToString() << ", got: " << originalType->ToString());
+
+ return [targetType](const std::shared_ptr<arrow::Array>& value) {
+ auto res = arrow::compute::Cast(*value, targetType);
+ THROW_ARROW_NOT_OK(res.status());
+ return std::move(res).ValueOrDie();
+ };
+}
+
class TS3ReadCoroImpl : public TActorCoroImpl {
friend class TS3StreamReadActor;
private:
@@ -1417,6 +1478,7 @@ private:
std::shared_ptr<arrow::Schema> schema = result.Schema;
std::vector<int> columnIndices;
std::vector<TColumnConverter> columnConverters;
+ columnConverters.reserve(ReadSpec->ArrowSchema->num_fields());
for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) {
const auto& targetField = ReadSpec->ArrowSchema->field(i);
auto srcFieldIndex = schema->GetFieldIndex(targetField->name());
@@ -1425,19 +1487,10 @@ private:
auto originalType = schema->field(srcFieldIndex)->type();
YQL_ENSURE(!originalType->layout().has_dictionary, "Unsupported dictionary encoding is used for field: "
<< targetField->name() << ", type: " << originalType->ToString());
- if (targetType->Equals(originalType)) {
- columnConverters.emplace_back();
- } else {
- YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << targetField->name() << ", expected: "
- << targetType->ToString() << ", got: " << originalType->ToString());
- columnConverters.emplace_back([targetType](const std::shared_ptr<arrow::Array>& value) {
- auto res = arrow::compute::Cast(*value, targetType);
- THROW_ARROW_NOT_OK(res.status());
- return std::move(res).ValueOrDie();
- });
- }
-
columnIndices.push_back(srcFieldIndex);
+ auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
+ YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
+ columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
}
fileDesc.Cookie = result.Cookie;
@@ -2353,17 +2406,20 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (params.HasFormat() && params.HasRowType()) {
const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry);
const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb, Cerr);
+ YQL_ENSURE(outputItemType->IsStruct(), "Row type is not struct");
const auto structType = static_cast<TStructType*>(outputItemType);
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Arrow = params.GetArrow();
if (readSpec->Arrow) {
arrow::SchemaBuilder builder;
- auto extraStructType = static_cast<TStructType*>(pb->NewStructType(structType, "_yql_block_length",
+ const TStringBuf blockLengthColumn("_yql_block_length"sv);
+ auto extraStructType = static_cast<TStructType*>(pb->NewStructType(structType, blockLengthColumn,
pb->NewBlockType(pb->NewDataType(NUdf::EDataSlot::Uint64), TBlockType::EShape::Scalar)));
for (ui32 i = 0U; i < extraStructType->GetMembersCount(); ++i) {
- if (extraStructType->GetMemberName(i) == "_yql_block_length") {
+ TStringBuf memberName = extraStructType->GetMemberName(i);
+ if (memberName == blockLengthColumn) {
readSpec->BlockLengthPosition = i;
continue;
}
@@ -2372,8 +2428,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::shared_ptr<arrow::DataType> dataType;
YQL_ENSURE(ConvertArrowType(memberType, dataType), "Unsupported arrow type");
- THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(extraStructType->GetMemberName(i)), dataType, memberType->IsOptional())));
+ THROW_ARROW_NOT_OK(builder.AddField(std::make_shared<arrow::Field>(std::string(memberName), dataType, memberType->IsOptional())));
readSpec->ColumnReorder.push_back(i);
+ readSpec->RowSpec.emplace(memberName, memberType);
}
auto res = builder.Finish();
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index 8113a7ee10a..95dcd61f160 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -74,8 +74,8 @@ public:
std::vector<TBlockArrayTree::Ptr> Children;
};
- TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen)
- : ArrowType(GetArrowType(typeInfoHelper, type))
+ TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen)
+ : ArrowType(std::move(arrowType))
, Pool(&pool)
, MaxLen(maxLen)
, MaxBlockSizeInBytes(typeInfoHelper.GetMaxBlockBytes())
@@ -84,6 +84,11 @@ public:
Y_VERIFY(maxLen > 0);
}
+ TArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen)
+ : TArrayBuilderBase(typeInfoHelper, GetArrowType(typeInfoHelper, type), pool, maxLen)
+ {
+ }
+
size_t MaxLength() const final {
return MaxLen;
}
@@ -292,6 +297,12 @@ private:
template <typename T, bool Nullable>
class TFixedSizeArrayBuilder : public TArrayBuilderBase {
public:
+ TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr<arrow::DataType> arrowType, arrow::MemoryPool& pool, size_t maxLen)
+ : TArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen)
+ {
+ Reserve();
+ }
+
TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen)
: TArrayBuilderBase(typeInfoHelper, type, pool, maxLen)
{