aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@yandex-team.com>2024-11-12 21:53:49 +0300
committerMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-11-12 22:40:29 +0300
commit7d04dad87f5a9cccc95be049327b57c915eded63 (patch)
tree5f86b91f6a2484736ef79d45b4d1b5379c895134
parentdb924b116e251d6b6127b94bd10ae875d5a4b488 (diff)
downloadydb-7d04dad87f5a9cccc95be049327b57c915eded63.tar.gz
Merge PR #10587: Block input for YT map operations
initial commit_hash:61c8442fd8a0ebe277511b5d98b334cf6bc95337
-rw-r--r--yql/essentials/core/yql_opt_utils.cpp90
-rw-r--r--yql/essentials/core/yql_opt_utils.h2
-rw-r--r--yql/essentials/providers/common/codec/arrow/ya.make11
-rw-r--r--yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp37
-rw-r--r--yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h45
-rw-r--r--yql/essentials/providers/common/codec/ya.make4
6 files changed, 189 insertions, 0 deletions
diff --git a/yql/essentials/core/yql_opt_utils.cpp b/yql/essentials/core/yql_opt_utils.cpp
index 9b39558176..4e9079e9ae 100644
--- a/yql/essentials/core/yql_opt_utils.cpp
+++ b/yql/essentials/core/yql_opt_utils.cpp
@@ -2216,5 +2216,95 @@ TVector<TString> GenNoClashColumns(const TStructExprType& source, TStringBuf pre
return result;
}
+bool CheckSupportedTypes(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler) {
+ TSet<ETypeAnnotationKind> supported;
+ for (const auto &e: supportedTypes) {
+ if (e == "pg") {
+ supported.insert(ETypeAnnotationKind::Pg);
+ } else if (e == "tuple") {
+ supported.emplace(ETypeAnnotationKind::Tuple);
+ } else if (e == "struct") {
+ supported.emplace(ETypeAnnotationKind::Struct);
+ } else if (e == "dict") {
+ supported.emplace(ETypeAnnotationKind::Dict);
+ } else if (e == "list") {
+ supported.emplace(ETypeAnnotationKind::List);
+ } else if (e == "variant") {
+ supported.emplace(ETypeAnnotationKind::Variant);
+ } else {
+ // Unknown type
+ unsupportedTypeHandler(TStringBuilder() << "unknown type: " << e);
+ return false;
+ }
+ }
+ if (supportedDataTypes.size()) {
+ supported.emplace(ETypeAnnotationKind::Data);
+ }
+ auto checkType = [&] (const TTypeAnnotationNode* type) {
+ if (type->GetKind() == ETypeAnnotationKind::Data) {
+ if (!supported.contains(ETypeAnnotationKind::Data)) {
+ unsupportedTypeHandler(TStringBuilder() << "unsupported data types");
+ return false;
+ }
+ if (!supportedDataTypes.contains(type->Cast<TDataExprType>()->GetSlot())) {
+ unsupportedTypeHandler(TStringBuilder() << "unsupported data type: " << type->Cast<TDataExprType>()->GetSlot());
+ return false;
+ }
+ } else if (type->GetKind() == ETypeAnnotationKind::Pg) {
+ if (!supported.contains(ETypeAnnotationKind::Pg)) {
+ unsupportedTypeHandler(TStringBuilder() << "unsupported pg");
+ return false;
+ }
+ auto name = type->Cast<TPgExprType>()->GetName();
+ if (name == "float4" && !supportedDataTypes.contains(NUdf::EDataSlot::Float)) {
+ unsupportedTypeHandler(TStringBuilder() << "PgFloat4 unsupported yet since float is no supported");
+ return false;
+ }
+ } else {
+ unsupportedTypeHandler(TStringBuilder() << "unsupported annotation kind: " << type->GetKind());
+ return false;
+ }
+ return true;
+ };
+
+ TVector<const TTypeAnnotationNode*> stack(typesToCheck.begin(), typesToCheck.end());
+ while (!stack.empty()) {
+ auto el = stack.back();
+ stack.pop_back();
+ if (el->GetKind() == ETypeAnnotationKind::Optional) {
+ stack.push_back(el->Cast<TOptionalExprType>()->GetItemType());
+ continue;
+ }
+ if (!supported.contains(el->GetKind())) {
+ unsupportedTypeHandler(TStringBuilder() << "unsupported " << el->GetKind());
+ return false;
+ }
+ if (el->GetKind() == ETypeAnnotationKind::Tuple) {
+ for (auto e: el->Cast<TTupleExprType>()->GetItems()) {
+ stack.push_back(e);
+ }
+ continue;
+ } else if (el->GetKind() == ETypeAnnotationKind::Struct) {
+ for (auto e: el->Cast<TStructExprType>()->GetItems()) {
+ stack.push_back(e->GetItemType());
+ }
+ continue;
+ } else if (el->GetKind() == ETypeAnnotationKind::List) {
+ stack.push_back(el->Cast<TListExprType>()->GetItemType());
+ continue;
+ } else if (el->GetKind() == ETypeAnnotationKind::Dict) {
+ stack.push_back(el->Cast<TDictExprType>()->GetKeyType());
+ stack.push_back(el->Cast<TDictExprType>()->GetPayloadType());
+ continue;
+ } else if (el->GetKind() == ETypeAnnotationKind::Variant) {
+ stack.push_back(el->Cast<TVariantExprType>()->GetUnderlyingType());
+ continue;
+ }
+ if (!checkType(el)) {
+ return false;
+ }
+ }
+ return true;
+}
}
diff --git a/yql/essentials/core/yql_opt_utils.h b/yql/essentials/core/yql_opt_utils.h
index a961489029..91ad4d2d3c 100644
--- a/yql/essentials/core/yql_opt_utils.h
+++ b/yql/essentials/core/yql_opt_utils.h
@@ -168,4 +168,6 @@ TPartOfConstraintBase::TSetType GetPathsToKeys(const TExprNode& body, const TExp
// prefix should start with "_yql"
TVector<TString> GenNoClashColumns(const TStructExprType& source, TStringBuf prefix, size_t count);
+bool CheckSupportedTypes(const TTypeAnnotationNode::TListType& typesToCheck, const TSet<TString>& typesSupported, const TSet<NUdf::EDataSlot>& dataSlotsSupported, std::function<void(const TString&)> unsupportedTypeHandler);
+
}
diff --git a/yql/essentials/providers/common/codec/arrow/ya.make b/yql/essentials/providers/common/codec/arrow/ya.make
new file mode 100644
index 0000000000..eaa0d9d62a
--- /dev/null
+++ b/yql/essentials/providers/common/codec/arrow/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+PEERDIR(
+ contrib/libs/apache/arrow
+)
+
+SRCS(
+ yql_codec_buf_input_stream.cpp
+)
+
+END()
diff --git a/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp
new file mode 100644
index 0000000000..6d131cb276
--- /dev/null
+++ b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.cpp
@@ -0,0 +1,37 @@
+#include "yql_codec_buf_input_stream.h"
+
+#include <yql/essentials/public/udf/arrow/defs.h>
+
+#include <arrow/buffer.h>
+#include <arrow/buffer.h>
+
+namespace NYql {
+namespace NCommon {
+
+arrow::Result<int64_t> TInputBufArrowInputStream::Read(int64_t bytesToRead, void* outBuffer) {
+ auto outBufferPtr = static_cast<char*>(outBuffer);
+
+ YQL_ENSURE(bytesToRead > 0);
+ if (!Buffer_.TryRead(*outBufferPtr)) {
+ EOSReached_ = true;
+ return 0;
+ }
+
+ Buffer_.ReadMany(outBufferPtr + 1, bytesToRead - 1);
+ BytesRead_ += bytesToRead;
+ return bytesToRead;
+}
+
+arrow::Result<std::shared_ptr<arrow::Buffer>> TInputBufArrowInputStream::Read(int64_t nbytes) {
+ auto outBuffer = ARROW_RESULT(AllocateResizableBuffer(nbytes, Pool_));
+ auto bytesRead = ARROW_RESULT(Read(nbytes, outBuffer->mutable_data()));
+ if (bytesRead == 0) {
+ return std::make_shared<arrow::Buffer>(nullptr, 0);
+ }
+
+ YQL_ENSURE(bytesRead == nbytes);
+ return outBuffer;
+}
+
+} // NCommon
+} // NYql
diff --git a/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h
new file mode 100644
index 0000000000..33ec2ee5e1
--- /dev/null
+++ b/yql/essentials/providers/common/codec/arrow/yql_codec_buf_input_stream.h
@@ -0,0 +1,45 @@
+#include <yql/essentials/providers/common/codec/yql_codec_buf.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/result.h>
+
+namespace NYql {
+namespace NCommon {
+
+class TInputBufArrowInputStream : public arrow::io::InputStream {
+public:
+ explicit TInputBufArrowInputStream(TInputBuf& buffer, arrow::MemoryPool* pool)
+ : Buffer_(buffer)
+ , Pool_(pool)
+ {
+ }
+
+ arrow::Result<int64_t> Read(int64_t bytesToRead, void* outBuffer) override;
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
+
+ arrow::Status Close() override {
+ return arrow::Status::OK();
+ }
+
+ arrow::Result<int64_t> Tell() const override {
+ return BytesRead_;
+ }
+
+ bool closed() const override {
+ return false;
+ }
+
+ bool EOSReached() const {
+ return EOSReached_;
+ }
+
+private:
+ TInputBuf& Buffer_;
+ int64_t BytesRead_ = 0;
+ bool EOSReached_ = false;
+
+ arrow::MemoryPool* Pool_;
+};
+
+} // NCommon
+} // NYql
diff --git a/yql/essentials/providers/common/codec/ya.make b/yql/essentials/providers/common/codec/ya.make
index daf15098c8..13ccdc726b 100644
--- a/yql/essentials/providers/common/codec/ya.make
+++ b/yql/essentials/providers/common/codec/ya.make
@@ -28,6 +28,10 @@ GENERATE_ENUM_SERIALIZATION(yql_codec_type_flags.h)
END()
+RECURSE(
+ arrow
+)
+
RECURSE_FOR_TESTS(
ut
)