diff options
author | aneporada <aneporada@yandex-team.com> | 2024-11-12 21:53:49 +0300 |
---|---|---|
committer | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-11-12 22:40:29 +0300 |
commit | 7d04dad87f5a9cccc95be049327b57c915eded63 (patch) | |
tree | 5f86b91f6a2484736ef79d45b4d1b5379c895134 | |
parent | db924b116e251d6b6127b94bd10ae875d5a4b488 (diff) | |
download | ydb-7d04dad87f5a9cccc95be049327b57c915eded63.tar.gz |
Merge PR #10587: Block input for YT map operations
initial
commit_hash:61c8442fd8a0ebe277511b5d98b334cf6bc95337
6 files changed, 189 insertions, 0 deletions
diff --git a/yql/essentials/core/yql_opt_utils.cpp b/yql/essentials/core/yql_opt_utils.cpp index 9b39558176..4e9079e9ae 100644 --- a/yql/essentials/core/yql_opt_utils.cpp +++ b/yql/essentials/core/yql_opt_utils.cpp @@ -2216,5 +2216,95 @@ TVector<TString> GenNoClashColumns(const TStructExprType& source, TStringBuf pre return result; } +bool CheckSupportedTypes(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler) { + TSet<ETypeAnnotationKind> supported; + for (const auto &e: supportedTypes) { + if (e == "pg") { + supported.insert(ETypeAnnotationKind::Pg); + } else if (e == "tuple") { + supported.emplace(ETypeAnnotationKind::Tuple); + } else if (e == "struct") { + supported.emplace(ETypeAnnotationKind::Struct); + } else if (e == "dict") { + supported.emplace(ETypeAnnotationKind::Dict); + } else if (e == "list") { + supported.emplace(ETypeAnnotationKind::List); + } else if (e == "variant") { + supported.emplace(ETypeAnnotationKind::Variant); + } else { + // Unknown type + unsupportedTypeHandler(TStringBuilder() << "unknown type: " << e); + return false; + } + } + if (supportedDataTypes.size()) { + supported.emplace(ETypeAnnotationKind::Data); + } + auto checkType = [&] (const TTypeAnnotationNode* type) { + if (type->GetKind() == ETypeAnnotationKind::Data) { + if (!supported.contains(ETypeAnnotationKind::Data)) { + unsupportedTypeHandler(TStringBuilder() << "unsupported data types"); + return false; + } + if (!supportedDataTypes.contains(type->Cast<TDataExprType>()->GetSlot())) { + unsupportedTypeHandler(TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot()); + return false; + } + } else if (type->GetKind() == ETypeAnnotationKind::Pg) { + if (!supported.contains(ETypeAnnotationKind::Pg)) { + unsupportedTypeHandler(TStringBuilder() << "unsupported pg"); + return false; + } + auto name = type->Cast<TPgExprType>()->GetName(); + if (name == "float4" && !supportedDataTypes.contains(NUdf::EDataSlot::Float)) { + unsupportedTypeHandler(TStringBuilder() << "PgFloat4 unsupported yet since float is no supported"); + return false; + } + } else { + unsupportedTypeHandler(TStringBuilder() << "unsupported annotation kind: " << type->GetKind()); + return false; + } + return true; + }; + + TVector<const TTypeAnnotationNode*> stack(typesToCheck.begin(), typesToCheck.end()); + while (!stack.empty()) { + auto el = stack.back(); + stack.pop_back(); + if (el->GetKind() == ETypeAnnotationKind::Optional) { + stack.push_back(el->Cast<TOptionalExprType>()->GetItemType()); + continue; + } + if (!supported.contains(el->GetKind())) { + unsupportedTypeHandler(TStringBuilder() << "unsupported " << el->GetKind()); + return false; + } + if (el->GetKind() == ETypeAnnotationKind::Tuple) { + for (auto e: el->Cast<TTupleExprType>()->GetItems()) { + stack.push_back(e); + } + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Struct) { + for (auto e: el->Cast<TStructExprType>()->GetItems()) { + stack.push_back(e->GetItemType()); + } + continue; + } else if (el->GetKind() == ETypeAnnotationKind::List) { + stack.push_back(el->Cast<TListExprType>()->GetItemType()); + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Dict) { + stack.push_back(el->Cast<TDictExprType>()->GetKeyType()); + stack.push_back(el->Cast<TDictExprType>()->GetPayloadType()); + continue; + } else if (el->GetKind() == ETypeAnnotationKind::Variant) { + stack.push_back(el->Cast<TVariantExprType>()->GetUnderlyingType()); + continue; + } + if (!checkType(el)) { + return false; + } + } + return true; +} } diff --git a/yql/essentials/core/yql_opt_utils.h b/yql/essentials/core/yql_opt_utils.h index a961489029..91ad4d2d3c 100644 --- a/yql/essentials/core/yql_opt_utils.h +++ b/yql/essentials/core/yql_opt_utils.h @@ -168,4 +168,6 @@ TPartOfConstraintBase::TSetType GetPathsToKeys(const TExprNode& body, const TExp // prefix should start with "_yql" TVector<TString> GenNoClashColumns(const TStructExprType& source, TStringBuf prefix, size_t count); +bool CheckSupportedTypes(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& typesSupported, const TSet<NUdf::EDataSlot>& dataSlotsSupported, std::function<void(const TString&)> unsupportedTypeHandler); + } diff --git a/yql/essentials/providers/common/codec/arrow/ya.make b/yql/essentials/providers/common/codec/arrow/ya.make new file mode 100644 index 0000000000..eaa0d9d62a --- /dev/null +++ b/yql/essentials/providers/common/codec/arrow/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +PEERDIR( + contrib/libs/apache/arrow +) + +SRCS( + yql_codec_buf_input_stream.cpp +) + +END() diff --git a/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp new file mode 100644 index 0000000000..6d131cb276 --- /dev/null +++ b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp @@ -0,0 +1,37 @@ +#include "yql_codec_buf_input_stream.h" + +#include <yql/essentials/public/udf/arrow/defs.h> + +#include <arrow/buffer.h> +#include <arrow/buffer.h> + +namespace NYql { +namespace NCommon { + +arrow::Result<int64_t> TInputBufArrowInputStream::Read(int64_t bytesToRead, void* outBuffer) { + auto outBufferPtr = static_cast<char*>(outBuffer); + + YQL_ENSURE(bytesToRead > 0); + if (!Buffer_.TryRead(*outBufferPtr)) { + EOSReached_ = true; + return 0; + } + + Buffer_.ReadMany(outBufferPtr + 1, bytesToRead - 1); + BytesRead_ += bytesToRead; + return bytesToRead; +} + +arrow::Result<std::shared_ptr<arrow::Buffer>> TInputBufArrowInputStream::Read(int64_t nbytes) { + auto outBuffer = ARROW_RESULT(AllocateResizableBuffer(nbytes, Pool_)); + auto bytesRead = ARROW_RESULT(Read(nbytes, outBuffer->mutable_data())); + if (bytesRead == 0) { + return std::make_shared<arrow::Buffer>(nullptr, 0); + } + + YQL_ENSURE(bytesRead == nbytes); + return outBuffer; +} + +} // NCommon +} // NYql diff --git a/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h new file mode 100644 index 0000000000..33ec2ee5e1 --- /dev/null +++ b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h @@ -0,0 +1,45 @@ +#include <yql/essentials/providers/common/codec/yql_codec_buf.h> + +#include <arrow/io/interfaces.h> +#include <arrow/result.h> + +namespace NYql { +namespace NCommon { + +class TInputBufArrowInputStream : public arrow::io::InputStream { +public: + explicit TInputBufArrowInputStream(TInputBuf& buffer, arrow::MemoryPool* pool) + : Buffer_(buffer) + , Pool_(pool) + { + } + + arrow::Result<int64_t> Read(int64_t bytesToRead, void* outBuffer) override; + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override; + + arrow::Status Close() override { + return arrow::Status::OK(); + } + + arrow::Result<int64_t> Tell() const override { + return BytesRead_; + } + + bool closed() const override { + return false; + } + + bool EOSReached() const { + return EOSReached_; + } + +private: + TInputBuf& Buffer_; + int64_t BytesRead_ = 0; + bool EOSReached_ = false; + + arrow::MemoryPool* Pool_; +}; + +} // NCommon +} // NYql diff --git a/yql/essentials/providers/common/codec/ya.make b/yql/essentials/providers/common/codec/ya.make index daf15098c8..13ccdc726b 100644 --- a/yql/essentials/providers/common/codec/ya.make +++ b/yql/essentials/providers/common/codec/ya.make @@ -28,6 +28,10 @@ GENERATE_ENUM_SERIALIZATION(yql_codec_type_flags.h) END() +RECURSE( + arrow +) + RECURSE_FOR_TESTS( ut ) |