aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrlolthe1st <mrlolthe1st@yandex-team.com>2023-02-10 18:37:45 +0300
committermrlolthe1st <mrlolthe1st@yandex-team.com>2023-02-10 18:37:45 +0300
commitd9cb9155799716cb0718fda1fcbc4f2d1d9876fb (patch)
tree28dfcd545da1ea4541bd121ffd8109a98009dbbd
parent130ad005b33e752ab1cb4933124cb380ebc5ca00 (diff)
downloadydb-d9cb9155799716cb0718fda1fcbc4f2d1d9876fb.tar.gz
Add ArrowReader
init
-rw-r--r--ydb/library/yql/providers/common/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt24
-rw-r--r--ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt25
-rw-r--r--ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt25
-rw-r--r--ydb/library/yql/providers/common/arrow/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp197
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt20
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt21
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp20
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/arrow_reader.h49
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt6
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt6
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt6
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp174
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp7
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h4
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt1
22 files changed, 595 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/CMakeLists.txt b/ydb/library/yql/providers/common/CMakeLists.txt
index 2aa69b6595a..b227fb9348a 100644
--- a/ydb/library/yql/providers/common/CMakeLists.txt
+++ b/ydb/library/yql/providers/common/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(activation)
+add_subdirectory(arrow)
add_subdirectory(arrow_resolve)
add_subdirectory(codec)
add_subdirectory(comp_nodes)
diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..62172bc6da5
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(interface)
+
+add_library(providers-common-arrow)
+target_compile_options(providers-common-arrow PRIVATE
+ -DARCADIA_BUILD
+ -DUSE_PARQUET
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-arrow PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ common-arrow-interface
+)
+target_sources(providers-common-arrow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
+)
diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..88697c69b08
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(interface)
+
+add_library(providers-common-arrow)
+target_compile_options(providers-common-arrow PRIVATE
+ -DARCADIA_BUILD
+ -DUSE_PARQUET
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-arrow PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ common-arrow-interface
+)
+target_sources(providers-common-arrow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
+)
diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt
new file mode 100644
index 00000000000..88697c69b08
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(interface)
+
+add_library(providers-common-arrow)
+target_compile_options(providers-common-arrow PRIVATE
+ -DARCADIA_BUILD
+ -DUSE_PARQUET
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-common-arrow PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ common-arrow-interface
+)
+target_sources(providers-common-arrow PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
+)
diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.txt
new file mode 100644
index 00000000000..5bb4faffb40
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
new file mode 100644
index 00000000000..e07a47b5aab
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
@@ -0,0 +1,197 @@
+#include "interface/arrow_reader.h"
+#include <util/thread/pool.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <parquet/arrow/reader.h>
+#include <arrow/io/api.h>
+#include <arrow/util/future.h>
+#include <parquet/file_reader.h>
+
+#include <iostream>
+#include <sstream>
+
+#define THROW_ARROW_NOT_OK(status) \
+ do \
+ { \
+ if (::arrow::Status _s = (status); !_s.ok()) \
+ throw yexception() << _s.ToString(); \
+ } while (false)
+
+namespace NYql {
+
+class TArrowReader : public IArrowReader {
+public:
+ TArrowReader(const TArrowReaderSettings& settings) {
+ ThreadPool = CreateThreadPool(settings.PoolSize);
+ }
+
+ NThreading::TFuture<TSchemaResponse> GetSchema(const TArrowFileDesc& desc) const override final;
+ NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, const std::vector<int>& columnIndices) const override final;
+ virtual ~TArrowReader() {
+
+ }
+private:
+ THolder<IThreadPool> ThreadPool;
+};
+
+IArrowReader::TPtr MakeArrowReader(const TArrowReaderSettings& settings) {
+ return std::make_shared<TArrowReader>(settings);
+}
+
+
+using ArrowFileReaderPtr = std::unique_ptr<parquet::arrow::FileReader>;
+
+class TS3RandomAccessFile : public arrow::io::RandomAccessFile {
+public:
+ explicit TS3RandomAccessFile(const TArrowFileDesc& desc) : Gateway(desc.Gateway), Headers(desc.Headers), RetryPolicy(desc.RetryPolicy), Url(desc.Url), FileSize(desc.Size) {
+
+ }
+
+ virtual arrow::Result<int64_t> GetSize() override {
+ return FileSize;
+ }
+
+ virtual arrow::Result<int64_t> Tell() const override {
+ return InnerPos;
+ }
+
+ virtual arrow::Status Seek(int64_t position) override {
+ InnerPos = position;
+ return {};
+ }
+
+ virtual arrow::Status Close() override {
+ return {};
+ }
+
+ virtual bool closed() const override {
+ return false;
+ }
+
+ virtual arrow::Result<int64_t> Read(int64_t, void* ) override {
+ Y_VERIFY(0);
+ return arrow::Result<int64_t>();
+ }
+
+ virtual arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override {
+ Y_VERIFY(0);
+ return arrow::Result<std::shared_ptr<arrow::Buffer>>();
+ }
+
+ virtual arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
+ try {
+ auto promise = NThreading::NewPromise<TString>();
+ Gateway->Download(Url, Headers,
+ position,
+ nbytes,
+ std::bind(&OnResult, promise, std::placeholders::_1),
+ {},
+ RetryPolicy);
+ return arrow::Buffer::FromString(promise.GetFuture().GetValueSync());
+ } catch (const std::exception& e) {
+ return arrow::Status::UnknownError(e.what());
+ }
+ }
+
+ virtual arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t,
+ int64_t) override
+ {
+ Y_VERIFY(0);
+ return arrow::Future<std::shared_ptr<arrow::Buffer>>();
+ }
+
+
+private:
+ static void OnResult(NThreading::TPromise<TString> promise, IHTTPGateway::TResult&& result) {
+ try {
+ auto res = std::get<IHTTPGateway::TContent>(result).Extract();
+ promise.SetValue(res);
+ } catch (const std::exception& e) {
+ promise.SetException(std::current_exception());
+ }
+ }
+
+ IHTTPGateway::TPtr Gateway;
+ IHTTPGateway::THeaders Headers;
+ IRetryPolicy<long>::TPtr RetryPolicy;
+ const TString Url;
+ const size_t FileSize;
+ int64_t InnerPos = 0;
+};
+
+
+struct TFileReaderWrapper {
+public:
+ TFileReaderWrapper(const TArrowFileDesc& desc) {
+ if (desc.Contents) {
+ ArrowFile = std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(*desc.Contents));
+ } else {
+ if (desc.IsLocal) {
+ ArrowFile = arrow::io::ReadableFile::Open(desc.Url.substr(7), arrow::default_memory_pool()).ValueOrDie();
+ } else {
+ ArrowFile = std::make_shared<TS3RandomAccessFile>(desc);
+ }
+ }
+
+ THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(ArrowFile, arrow::default_memory_pool(), &FileReader));
+ }
+
+ ArrowFileReaderPtr FileReader;
+ std::shared_ptr<arrow::io::RandomAccessFile> ArrowFile;
+};
+
+NThreading::TFuture<IArrowReader::TSchemaResponse> TArrowReader::GetSchema(const TArrowFileDesc& desc) const
+{
+ auto promise = NThreading::NewPromise<TSchemaResponse>();
+ auto future = promise.GetFuture();
+
+ YQL_ENSURE(desc.Format == "parquet");
+
+ if (!ThreadPool->AddFunc([desc, promise] () mutable {
+ YQL_ENSURE(desc.Format == "parquet");
+ try {
+ TFileReaderWrapper wrapper(desc);
+
+ std::shared_ptr<arrow::Schema> schema;
+
+ THROW_ARROW_NOT_OK(wrapper.FileReader->GetSchema(&schema));
+
+ promise.SetValue(TSchemaResponse(schema, wrapper.FileReader->num_row_groups()));
+ } catch (const std::exception&) {
+ promise.SetException(std::current_exception());
+ }
+ }))
+ {
+ promise.SetException("AddFunc to ThreadPool failed");
+ }
+
+ return future;
+}
+
+NThreading::TFuture<std::shared_ptr<arrow::Table>> TArrowReader::ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex,
+ const std::vector<int>& columnIndices) const
+{
+ auto promise = NThreading::NewPromise<std::shared_ptr<arrow::Table>>();
+ auto future = promise.GetFuture();
+
+ if (!ThreadPool->AddFunc([desc, promise, rowGroupIndex, columnIndices] () mutable {
+ YQL_ENSURE(desc.Format == "parquet");
+ try {
+ TFileReaderWrapper wrapper(desc);
+
+ std::shared_ptr<arrow::Table> currentTable;
+
+ THROW_ARROW_NOT_OK(wrapper.FileReader->ReadRowGroup(rowGroupIndex, columnIndices, &currentTable));
+
+ promise.SetValue(currentTable);
+ } catch (const std::exception&) {
+ promise.SetException(std::current_exception());
+ }
+ }))
+ {
+ promise.SetException("AddFunc to ThreadPool failed");
+ }
+
+ return future;
+}
+}
diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..adefcdf8d59
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(common-arrow-interface)
+target_link_libraries(common-arrow-interface PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ cpp-threading-future
+ providers-common-http_gateway
+)
+target_sources(common-arrow-interface PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
+)
diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..35350df995e
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(common-arrow-interface)
+target_link_libraries(common-arrow-interface PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ cpp-threading-future
+ providers-common-http_gateway
+)
+target_sources(common-arrow-interface PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
+)
diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt
new file mode 100644
index 00000000000..35350df995e
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(common-arrow-interface)
+target_link_libraries(common-arrow-interface PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ cpp-threading-future
+ providers-common-http_gateway
+)
+target_sources(common-arrow-interface PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
+)
diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt
new file mode 100644
index 00000000000..5bb4faffb40
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
new file mode 100644
index 00000000000..beae3d57b52
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
@@ -0,0 +1,20 @@
+#include "arrow_reader.h"
+
+namespace NYql {
+TArrowFileDesc::TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format)
+ : Url(url), Gateway(gateway), Headers(headers), RetryPolicy(retryPolicy), Format(format), Size(size), IsLocal(url.StartsWith("file://"))
+{
+
+}
+
+IArrowReader::TSchemaResponse::TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups) : Schema(schema), NumRowGroups(numRowGroups)
+{
+
+}
+
+TArrowReaderSettings::TArrowReaderSettings(size_t poolSize) : PoolSize(poolSize)
+{
+
+}
+
+}
diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
new file mode 100644
index 00000000000..4fb283377a4
--- /dev/null
+++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <util/generic/string.h>
+#include <arrow/api.h>
+#include <vector>
+#include <memory>
+
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+
+namespace NYql {
+class TArrowFileDesc {
+public:
+ TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format = "");
+ TString Url;
+ IHTTPGateway::TPtr Gateway;
+ IHTTPGateway::THeaders Headers;
+ IRetryPolicy<long>::TPtr RetryPolicy;
+ TString Format;
+ size_t Size;
+ bool IsLocal;
+ TMaybe<TString> Contents;
+};
+
+class TArrowReaderSettings {
+public:
+ explicit TArrowReaderSettings(size_t poolSize = 10);
+ size_t PoolSize;
+};
+
+class IArrowReader {
+public:
+ using TPtr = std::shared_ptr<IArrowReader>;
+
+ class TSchemaResponse {
+ public:
+ TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups);
+ std::shared_ptr<arrow::Schema> Schema;
+ int NumRowGroups;
+ };
+
+ virtual NThreading::TFuture<TSchemaResponse> GetSchema(const TArrowFileDesc& desc) const = 0;
+ virtual NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, const std::vector<int>& columnIndices) const = 0;
+ virtual ~IArrowReader() = default;
+};
+
+IArrowReader::TPtr MakeArrowReader(const TArrowReaderSettings& settings);
+
+}
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
index 5aec2cacd7b..30b8f4cc172 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
@@ -28,11 +28,13 @@ target_link_libraries(providers-s3-actors PUBLIC
cpp-string_utils-quote
cpp-xml-document
yql-minikql-computation
- common-token_accessor-client
- common-schema-mkql
yql-public-types
dq-actors-compute
+ common-token_accessor-client
+ common-schema-mkql
providers-common-http_gateway
+ providers-common-arrow
+ common-arrow-interface
providers-s3-common
providers-s3-compressors
providers-s3-proto
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt
index c6206372321..6971d9cdc8c 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt
@@ -29,11 +29,13 @@ target_link_libraries(providers-s3-actors PUBLIC
cpp-string_utils-quote
cpp-xml-document
yql-minikql-computation
- common-token_accessor-client
- common-schema-mkql
yql-public-types
dq-actors-compute
+ common-token_accessor-client
+ common-schema-mkql
providers-common-http_gateway
+ providers-common-arrow
+ common-arrow-interface
providers-s3-common
providers-s3-compressors
providers-s3-proto
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
index c6206372321..6971d9cdc8c 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
@@ -29,11 +29,13 @@ target_link_libraries(providers-s3-actors PUBLIC
cpp-string_utils-quote
cpp-xml-document
yql-minikql-computation
- common-token_accessor-client
- common-schema-mkql
yql-public-types
dq-actors-compute
+ common-token_accessor-client
+ common-schema-mkql
providers-common-http_gateway
+ providers-common-arrow
+ common-arrow-interface
providers-s3-common
providers-s3-compressors
providers-s3-proto
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 3f49dbd6fad..1171dcab809 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -136,6 +136,7 @@ struct TEvPrivate {
EvFileFinished,
EvPause,
EvContinue,
+ EvFutureResolved,
EvEnd
};
@@ -160,6 +161,10 @@ struct TEvPrivate {
IHTTPGateway::TCountedContent Result;
};
+ struct TEvFutureResolved : public TEventLocal<TEvFutureResolved, EvFutureResolved> {
+ TEvFutureResolved() {}
+ };
+
struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> {
explicit TEvReadStarted(long httpResponseCode) : HttpResponseCode(httpResponseCode) {}
const long HttpResponseCode;
@@ -525,6 +530,7 @@ struct TRetryStuff {
, SizeLimit(sizeLimit)
, TxId(txId)
, RequestId(requestId)
+ , RetryState(retryPolicy->CreateRetryState())
, RetryPolicy(retryPolicy)
, Cancelled(false)
{}
@@ -535,8 +541,8 @@ struct TRetryStuff {
std::size_t Offset, SizeLimit;
const TTxId TxId;
const TString RequestId;
- const IRetryPolicy<long>::TPtr RetryPolicy;
IRetryPolicy<long>::IRetryState::TPtr RetryState;
+ IRetryPolicy<long>::TPtr RetryPolicy;
IHTTPGateway::TCancelHook CancelHook;
TMaybe<TDuration> NextRetryDelay;
std::atomic_bool Cancelled;
@@ -609,12 +615,15 @@ using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::
class TArrowParquetBatchReader : public IBatchReader<std::shared_ptr<arrow::RecordBatch>> {
public:
- TArrowParquetBatchReader(std::unique_ptr<parquet::arrow::FileReader>&& fileReader, std::vector<int>&& columnIndices, std::vector<TColumnConverter>&& columnConverters)
- : FileReader(std::move(fileReader))
+ TArrowParquetBatchReader(TArrowFileDesc&& fileDesc, IArrowReader::TPtr arrowReader, int numRowGroups, std::vector<int>&& columnIndices, std::vector<TColumnConverter>&& columnConverters, std::function<void()>&& onFutureResolve, std::function<void()>&& waitForFutureResolve)
+ : FileDesc(std::move(fileDesc))
+ , ArrowReader(arrowReader)
, ColumnIndices(std::move(columnIndices))
, ColumnConverters(std::move(columnConverters))
- , TotalGroups(FileReader->num_row_groups())
+ , TotalGroups(numRowGroups)
, CurrentGroup(0)
+ , OnFutureResolve(std::move(onFutureResolve))
+ , WaitForFutureResolve(std::move(waitForFutureResolve))
{}
bool Next(std::shared_ptr<arrow::RecordBatch>& value) final {
@@ -624,7 +633,12 @@ public:
}
if (!CurrentBatchReader) {
- THROW_ARROW_NOT_OK(FileReader->ReadRowGroup(CurrentGroup++, ColumnIndices, &CurrentTable));
+ auto future = ArrowReader->ReadRowGroup(FileDesc, CurrentGroup++, ColumnIndices);
+ future.Subscribe([this](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
+ OnFutureResolve();
+ });
+ WaitForFutureResolve();
+ CurrentTable = future.GetValue();
CurrentBatchReader = std::make_unique<arrow::TableBatchReader>(*CurrentTable);
}
@@ -648,13 +662,16 @@ public:
}
private:
- std::unique_ptr<parquet::arrow::FileReader> FileReader;
+ TArrowFileDesc FileDesc;
+ IArrowReader::TPtr ArrowReader;
const std::vector<int> ColumnIndices;
std::vector<TColumnConverter> ColumnConverters;
const int TotalGroups;
int CurrentGroup;
std::shared_ptr<arrow::Table> CurrentTable;
std::unique_ptr<arrow::TableBatchReader> CurrentBatchReader;
+ std::function<void()> OnFutureResolve;
+ std::function<void()> WaitForFutureResolve;
};
ui64 GetSizeOfData(const arrow::ArrayData& data) {
@@ -711,16 +728,16 @@ private:
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
public:
- TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
- const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url, const std::size_t maxBlocksInFly,
+ TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
+ const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
+ const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
- : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
- TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
- PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly),
+ : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
+ TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
+ PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
{}
@@ -730,6 +747,10 @@ public:
}
}
+ bool IsDownloadNeeded() const {
+ return !ReadSpec->Arrow || !ReadSpec->Compression.empty();
+ }
+
bool Next(TString& value) {
if (InputFinished)
return false;
@@ -880,25 +901,66 @@ private:
TIssue exceptIssue;
bool isLocal = Url.StartsWith("file://");
+ bool needWaitFinish = !isLocal;
try {
- std::unique_ptr<NDB::ReadBuffer> buffer;
- if (isLocal) {
- buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path);
- } else {
- buffer = std::make_unique<TReadBufferFromStream>(this);
- }
-
- const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
- YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
if (ReadSpec->Arrow) {
- YQL_ENSURE(ReadSpec->Format == "parquet");
- std::unique_ptr<parquet::arrow::FileReader> fileReader;
- auto arrowFile = NDB::asArrowFile(decompress ? *decompress : *buffer);
+ TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format);
+ if (IsDownloadNeeded()) {
+ // Read file entirely
+ std::unique_ptr<NDB::ReadBuffer> buffer;
+ if (isLocal) {
+ buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path);
+ } else {
+ buffer = std::make_unique<TReadBufferFromStream>(this);
+ }
+
+ const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
+ YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
+ auto& readBuffer = decompress ? *decompress : *buffer;
+ TStringBuilder sb;
+ TBuffer buff(256_KB);
+
+ while (!readBuffer.eof()) {
+ if (!readBuffer.hasPendingData()) {
+ if (!readBuffer.next()) {
+ break;
+ }
+ }
+ auto bytesReaded = readBuffer.read(buff.data(), 256_KB);
+ sb.append(buff.data(), buff.data() + bytesReaded);
+ }
- THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(arrowFile, arrow::default_memory_pool(), &fileReader));
- std::shared_ptr<arrow::Schema> schema;
- THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
+ fileDesc.Contents = sb;
+ } else {
+ needWaitFinish = false;
+ }
+ auto actorSystem = GetActorSystem();
+ auto onResolve = [actorSystem, actorId = this->SelfActorId] {
+ actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved()));
+ };
+ auto waitForResolve = [&] {
+ auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+ TVector<THolder<IEventBase>> otherEvents;
+ while (!event->CastAsLocal<TEvPrivate::TEvFutureResolved>()) {
+ if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
+ throw TS3ReadAbort();
+ }
+ otherEvents.push_back(event->ReleaseBase());
+ event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+ }
+
+ for (auto &e: otherEvents) {
+ Send(SelfActorId, e.Release());
+ }
+ };
+ auto future = ArrowReader->GetSchema(fileDesc);
+ future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
+ onResolve();
+ });
+ waitForResolve();
+ auto result = future.GetValue();
+ std::shared_ptr<arrow::Schema> schema = result.Schema;
std::vector<int> columnIndices;
std::vector<TColumnConverter> columnConverters;
for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) {
@@ -924,9 +986,25 @@ private:
columnIndices.push_back(srcFieldIndex);
}
- TArrowParquetBatchReader reader(std::move(fileReader), std::move(columnIndices), std::move(columnConverters));
+ TArrowParquetBatchReader reader(std::move(fileDesc),
+ ArrowReader,
+ result.NumRowGroups,
+ std::move(columnIndices),
+ std::move(columnConverters),
+ onResolve,
+ waitForResolve);
ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
} else {
+ std::unique_ptr<NDB::ReadBuffer> buffer;
+ if (isLocal) {
+ buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path);
+ } else {
+ buffer = std::make_unique<TReadBufferFromStream>(this);
+ }
+
+ const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
+ YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
+
auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
TBlockReader reader(std::move(stream));
ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal);
@@ -951,7 +1029,7 @@ private:
RetryStuff->Cancel();
}
- if (!isLocal) {
+ if (needWaitFinish) {
WaitFinish();
}
@@ -988,13 +1066,30 @@ private:
auto selfActorId = SelfActorId;
size_t cntBlocksInFly = 0;
if (isLocal) {
+ auto waitProcessed = [&] {
+ auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+ TVector<THolder<IEventBase>> otherEvents;
+ while (!event->CastAsLocal<TEvPrivate::TEvBlockProcessed>()) {
+ if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
+ throw TS3ReadAbort();
+ }
+ otherEvents.push_back(event->ReleaseBase());
+ event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>();
+ }
+
+ for (auto& e: otherEvents) {
+ Send(SelfActorId, e.Release());
+ }
+ };
+
for (;;) {
T batch;
+
if (!reader.Next(batch)) {
break;
}
if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ waitProcessed();
--cntBlocksInFly;
}
Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() {
@@ -1002,7 +1097,7 @@ private:
}, GetIngressDelta()));
}
while (cntBlocksInFly--) {
- WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ waitProcessed();
}
} else {
for (;;) {
@@ -1078,6 +1173,7 @@ private:
std::size_t LastOffset = 0;
TString LastData;
std::size_t MaxBlocksInFly = 2;
+ IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
bool Paused = false;
std::queue<THolder<IEventHandle>> DeferredEvents;
@@ -1088,16 +1184,17 @@ private:
class TS3ReadCoroActor : public TActorCoro {
public:
- TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize)
+ TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, bool isDownloadNeeded, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
, RetryStuff(std::move(retryStuff))
, PathIndex(pathIndex)
+ , IsDownloadNeeded(isDownloadNeeded)
, HttpInflightSize(httpInflightSize)
{}
private:
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
- if (RetryStuff->Url.substr(0, 6) != "file://") {
+ if (IsDownloadNeeded && RetryStuff->Url.substr(0, 6) != "file://") {
LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]");
DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex, HttpInflightSize);
}
@@ -1105,6 +1202,7 @@ private:
const TRetryStuff::TPtr RetryStuff;
const size_t PathIndex;
+ const bool IsDownloadNeeded;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
};
@@ -1124,6 +1222,7 @@ public:
const NActors::TActorId& computeActorId,
const IRetryPolicy<long>::TPtr& retryPolicy,
const std::size_t maxBlocksInFly,
+ IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters
@@ -1142,6 +1241,7 @@ public:
, ReadSpec(readSpec)
, Count(Paths.size())
, MaxBlocksInFly(maxBlocksInFly)
+ , ArrowReader(arrowReader)
, Counters(counters)
, TaskCounters(taskCounters)
{
@@ -1201,8 +1301,8 @@ public:
HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream());
}
::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter;
- auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps);
- CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->HttpInflightSize).release()));
+ auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ArrowReader, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps);
+ CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->IsDownloadNeeded(), impl->HttpInflightSize).release()));
}
static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR";
@@ -1486,6 +1586,7 @@ private:
std::deque<TReadyBlock> Blocks;
ui32 Count;
const std::size_t MaxBlocksInFly;
+ IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
size_t CurrentPathIndex = 0;
mutable TInstant LastMemoryReport = TInstant::Now();
@@ -1654,6 +1755,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const IRetryPolicy<long>::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
+ IArrowReader::TPtr arrowReader,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters)
{
@@ -1769,7 +1871,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
maxBlocksInFly = FromString<ui64>(it->second);
const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy,
- maxBlocksInFly, cfg, counters, taskCounters);
+ maxBlocksInFly, arrowReader, cfg, counters, taskCounters);
return {actor, actor};
} else {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 528aec20c38..60c227cfece 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -5,6 +5,7 @@
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
#include <ydb/library/yql/providers/s3/proto/source.pb.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h>
#include <library/cpp/actors/core/actor.h>
namespace NYql::NDq {
@@ -24,6 +25,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const IRetryPolicy<long>::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
+ IArrowReader::TPtr arrowReader,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters);
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index 8339ab757a1..fef83090b94 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -16,14 +16,15 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway,
const IRetryPolicy<long>::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
- ::NMonitoring::TDynamicCounterPtr counters) {
+ ::NMonitoring::TDynamicCounterPtr counters,
+ IArrowReader::TPtr arrowReader) {
#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
+ [credentialsFactory, gateway, retryPolicy, cfg, counters, arrowReader](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
std::move(settings), args.InputIndex, args.TxId, args.SecureParams,
- args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
+ args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, arrowReader,
counters, args.TaskCounters);
});
#else
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 8a61b163961..60b1c0d4ff4 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -6,6 +6,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
+#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <util/generic/size_literals.h>
@@ -24,6 +25,7 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
const TS3ReadActorFactoryConfig& = {},
- ::NMonitoring::TDynamicCounterPtr counters = nullptr);
+ ::NMonitoring::TDynamicCounterPtr counters = nullptr,
+ IArrowReader::TPtr arrowReader = MakeArrowReader(TArrowReaderSettings()));
}
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt
index 1716e4591ba..687d9330bdb 100644
--- a/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt
@@ -26,6 +26,7 @@ target_link_libraries(providers-s3-compressors PUBLIC
contrib-libs-lz4
contrib-libs-lzma
contrib-libs-zstd
+ clickhouse_client_udf
)
target_sources(providers-s3-compressors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt
index 29593593d83..55350bf2db6 100644
--- a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,7 @@ target_link_libraries(providers-s3-compressors PUBLIC
contrib-libs-lz4
contrib-libs-lzma
contrib-libs-zstd
+ clickhouse_client_udf
)
target_sources(providers-s3-compressors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp
diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
index 29593593d83..55350bf2db6 100644
--- a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt
@@ -27,6 +27,7 @@ target_link_libraries(providers-s3-compressors PUBLIC
contrib-libs-lz4
contrib-libs-lzma
contrib-libs-zstd
+ clickhouse_client_udf
)
target_sources(providers-s3-compressors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp