diff options
author | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-02-10 18:37:45 +0300 |
---|---|---|
committer | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-02-10 18:37:45 +0300 |
commit | d9cb9155799716cb0718fda1fcbc4f2d1d9876fb (patch) | |
tree | 28dfcd545da1ea4541bd121ffd8109a98009dbbd | |
parent | 130ad005b33e752ab1cb4933124cb380ebc5ca00 (diff) | |
download | ydb-d9cb9155799716cb0718fda1fcbc4f2d1d9876fb.tar.gz |
Add ArrowReader
init
22 files changed, 595 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/CMakeLists.txt b/ydb/library/yql/providers/common/CMakeLists.txt index 2aa69b6595a..b227fb9348a 100644 --- a/ydb/library/yql/providers/common/CMakeLists.txt +++ b/ydb/library/yql/providers/common/CMakeLists.txt @@ -7,6 +7,7 @@ add_subdirectory(activation) +add_subdirectory(arrow) add_subdirectory(arrow_resolve) add_subdirectory(codec) add_subdirectory(comp_nodes) diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt new file mode 100644 index 00000000000..62172bc6da5 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/CMakeLists.darwin.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(interface) + +add_library(providers-common-arrow) +target_compile_options(providers-common-arrow PRIVATE + -DARCADIA_BUILD + -DUSE_PARQUET + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-arrow PUBLIC + contrib-libs-cxxsupp + yutil + common-arrow-interface +) +target_sources(providers-common-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp +) diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..88697c69b08 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/CMakeLists.linux-aarch64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(interface) + +add_library(providers-common-arrow) +target_compile_options(providers-common-arrow PRIVATE + -DARCADIA_BUILD + -DUSE_PARQUET + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-arrow PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + common-arrow-interface +) +target_sources(providers-common-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp +) diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt new file mode 100644 index 00000000000..88697c69b08 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/CMakeLists.linux.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(interface) + +add_library(providers-common-arrow) +target_compile_options(providers-common-arrow PRIVATE + -DARCADIA_BUILD + -DUSE_PARQUET + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-common-arrow PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + common-arrow-interface +) +target_sources(providers-common-arrow PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp +) diff --git a/ydb/library/yql/providers/common/arrow/CMakeLists.txt b/ydb/library/yql/providers/common/arrow/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp new file mode 100644 index 00000000000..e07a47b5aab --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp @@ -0,0 +1,197 @@ +#include "interface/arrow_reader.h" +#include <util/thread/pool.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <parquet/arrow/reader.h> +#include <arrow/io/api.h> +#include <arrow/util/future.h> +#include <parquet/file_reader.h> + +#include <iostream> +#include <sstream> + +#define THROW_ARROW_NOT_OK(status) \ + do \ + { \ + if (::arrow::Status _s = (status); !_s.ok()) \ + throw yexception() << _s.ToString(); \ + } while (false) + +namespace NYql { + +class TArrowReader : public IArrowReader { +public: + TArrowReader(const TArrowReaderSettings& settings) { + ThreadPool = CreateThreadPool(settings.PoolSize); + } + + NThreading::TFuture<TSchemaResponse> GetSchema(const TArrowFileDesc& desc) const override final; + NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, const std::vector<int>& columnIndices) const override final; + virtual ~TArrowReader() { + + } +private: + THolder<IThreadPool> ThreadPool; +}; + +IArrowReader::TPtr MakeArrowReader(const TArrowReaderSettings& settings) { + return std::make_shared<TArrowReader>(settings); +} + + +using ArrowFileReaderPtr = std::unique_ptr<parquet::arrow::FileReader>; + +class TS3RandomAccessFile : public arrow::io::RandomAccessFile { +public: + explicit TS3RandomAccessFile(const TArrowFileDesc& desc) : Gateway(desc.Gateway), Headers(desc.Headers), RetryPolicy(desc.RetryPolicy), Url(desc.Url), FileSize(desc.Size) { + + } + + virtual arrow::Result<int64_t> GetSize() override { + return FileSize; + } + + virtual arrow::Result<int64_t> Tell() const override { + return InnerPos; + } + + virtual arrow::Status Seek(int64_t position) override { + InnerPos = position; + return {}; + } + + virtual arrow::Status Close() override { + return {}; + } + + virtual bool closed() const override { + return false; + } + + virtual arrow::Result<int64_t> Read(int64_t, void* ) override { + Y_VERIFY(0); + return arrow::Result<int64_t>(); + } + + virtual arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override { + Y_VERIFY(0); + return arrow::Result<std::shared_ptr<arrow::Buffer>>(); + } + + virtual arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override { + try { + auto promise = NThreading::NewPromise<TString>(); + Gateway->Download(Url, Headers, + position, + nbytes, + std::bind(&OnResult, promise, std::placeholders::_1), + {}, + RetryPolicy); + return arrow::Buffer::FromString(promise.GetFuture().GetValueSync()); + } catch (const std::exception& e) { + return arrow::Status::UnknownError(e.what()); + } + } + + virtual arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t, + int64_t) override + { + Y_VERIFY(0); + return arrow::Future<std::shared_ptr<arrow::Buffer>>(); + } + + +private: + static void OnResult(NThreading::TPromise<TString> promise, IHTTPGateway::TResult&& result) { + try { + auto res = std::get<IHTTPGateway::TContent>(result).Extract(); + promise.SetValue(res); + } catch (const std::exception& e) { + promise.SetException(std::current_exception()); + } + } + + IHTTPGateway::TPtr Gateway; + IHTTPGateway::THeaders Headers; + IRetryPolicy<long>::TPtr RetryPolicy; + const TString Url; + const size_t FileSize; + int64_t InnerPos = 0; +}; + + +struct TFileReaderWrapper { +public: + TFileReaderWrapper(const TArrowFileDesc& desc) { + if (desc.Contents) { + ArrowFile = std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(*desc.Contents)); + } else { + if (desc.IsLocal) { + ArrowFile = arrow::io::ReadableFile::Open(desc.Url.substr(7), arrow::default_memory_pool()).ValueOrDie(); + } else { + ArrowFile = std::make_shared<TS3RandomAccessFile>(desc); + } + } + + THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(ArrowFile, arrow::default_memory_pool(), &FileReader)); + } + + ArrowFileReaderPtr FileReader; + std::shared_ptr<arrow::io::RandomAccessFile> ArrowFile; +}; + +NThreading::TFuture<IArrowReader::TSchemaResponse> TArrowReader::GetSchema(const TArrowFileDesc& desc) const +{ + auto promise = NThreading::NewPromise<TSchemaResponse>(); + auto future = promise.GetFuture(); + + YQL_ENSURE(desc.Format == "parquet"); + + if (!ThreadPool->AddFunc([desc, promise] () mutable { + YQL_ENSURE(desc.Format == "parquet"); + try { + TFileReaderWrapper wrapper(desc); + + std::shared_ptr<arrow::Schema> schema; + + THROW_ARROW_NOT_OK(wrapper.FileReader->GetSchema(&schema)); + + promise.SetValue(TSchemaResponse(schema, wrapper.FileReader->num_row_groups())); + } catch (const std::exception&) { + promise.SetException(std::current_exception()); + } + })) + { + promise.SetException("AddFunc to ThreadPool failed"); + } + + return future; +} + +NThreading::TFuture<std::shared_ptr<arrow::Table>> TArrowReader::ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, + const std::vector<int>& columnIndices) const +{ + auto promise = NThreading::NewPromise<std::shared_ptr<arrow::Table>>(); + auto future = promise.GetFuture(); + + if (!ThreadPool->AddFunc([desc, promise, rowGroupIndex, columnIndices] () mutable { + YQL_ENSURE(desc.Format == "parquet"); + try { + TFileReaderWrapper wrapper(desc); + + std::shared_ptr<arrow::Table> currentTable; + + THROW_ARROW_NOT_OK(wrapper.FileReader->ReadRowGroup(rowGroupIndex, columnIndices, ¤tTable)); + + promise.SetValue(currentTable); + } catch (const std::exception&) { + promise.SetException(std::current_exception()); + } + })) + { + promise.SetException("AddFunc to ThreadPool failed"); + } + + return future; +} +} diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt new file mode 100644 index 00000000000..adefcdf8d59 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.darwin.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(common-arrow-interface) +target_link_libraries(common-arrow-interface PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + cpp-threading-future + providers-common-http_gateway +) +target_sources(common-arrow-interface PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp +) diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..35350df995e --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(common-arrow-interface) +target_link_libraries(common-arrow-interface PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + cpp-threading-future + providers-common-http_gateway +) +target_sources(common-arrow-interface PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp +) diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt new file mode 100644 index 00000000000..35350df995e --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.linux.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(common-arrow-interface) +target_link_libraries(common-arrow-interface PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + cpp-threading-future + providers-common-http_gateway +) +target_sources(common-arrow-interface PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp +) diff --git a/ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/interface/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp new file mode 100644 index 00000000000..beae3d57b52 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp @@ -0,0 +1,20 @@ +#include "arrow_reader.h" + +namespace NYql { +TArrowFileDesc::TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format) + : Url(url), Gateway(gateway), Headers(headers), RetryPolicy(retryPolicy), Format(format), Size(size), IsLocal(url.StartsWith("file://")) +{ + +} + +IArrowReader::TSchemaResponse::TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups) : Schema(schema), NumRowGroups(numRowGroups) +{ + +} + +TArrowReaderSettings::TArrowReaderSettings(size_t poolSize) : PoolSize(poolSize) +{ + +} + +} diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h new file mode 100644 index 00000000000..4fb283377a4 --- /dev/null +++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h @@ -0,0 +1,49 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <util/generic/string.h> +#include <arrow/api.h> +#include <vector> +#include <memory> + +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> + +namespace NYql { +class TArrowFileDesc { +public: + TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format = ""); + TString Url; + IHTTPGateway::TPtr Gateway; + IHTTPGateway::THeaders Headers; + IRetryPolicy<long>::TPtr RetryPolicy; + TString Format; + size_t Size; + bool IsLocal; + TMaybe<TString> Contents; +}; + +class TArrowReaderSettings { +public: + explicit TArrowReaderSettings(size_t poolSize = 10); + size_t PoolSize; +}; + +class IArrowReader { +public: + using TPtr = std::shared_ptr<IArrowReader>; + + class TSchemaResponse { + public: + TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups); + std::shared_ptr<arrow::Schema> Schema; + int NumRowGroups; + }; + + virtual NThreading::TFuture<TSchemaResponse> GetSchema(const TArrowFileDesc& desc) const = 0; + virtual NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, const std::vector<int>& columnIndices) const = 0; + virtual ~IArrowReader() = default; +}; + +IArrowReader::TPtr MakeArrowReader(const TArrowReaderSettings& settings); + +} diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt index 5aec2cacd7b..30b8f4cc172 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt @@ -28,11 +28,13 @@ target_link_libraries(providers-s3-actors PUBLIC cpp-string_utils-quote cpp-xml-document yql-minikql-computation - common-token_accessor-client - common-schema-mkql yql-public-types dq-actors-compute + common-token_accessor-client + common-schema-mkql providers-common-http_gateway + providers-common-arrow + common-arrow-interface providers-s3-common providers-s3-compressors providers-s3-proto diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt index c6206372321..6971d9cdc8c 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt @@ -29,11 +29,13 @@ target_link_libraries(providers-s3-actors PUBLIC cpp-string_utils-quote cpp-xml-document yql-minikql-computation - common-token_accessor-client - common-schema-mkql yql-public-types dq-actors-compute + common-token_accessor-client + common-schema-mkql providers-common-http_gateway + providers-common-arrow + common-arrow-interface providers-s3-common providers-s3-compressors providers-s3-proto diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index c6206372321..6971d9cdc8c 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -29,11 +29,13 @@ target_link_libraries(providers-s3-actors PUBLIC cpp-string_utils-quote cpp-xml-document yql-minikql-computation - common-token_accessor-client - common-schema-mkql yql-public-types dq-actors-compute + common-token_accessor-client + common-schema-mkql providers-common-http_gateway + providers-common-arrow + common-arrow-interface providers-s3-common providers-s3-compressors providers-s3-proto diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 3f49dbd6fad..1171dcab809 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -136,6 +136,7 @@ struct TEvPrivate { EvFileFinished, EvPause, EvContinue, + EvFutureResolved, EvEnd }; @@ -160,6 +161,10 @@ struct TEvPrivate { IHTTPGateway::TCountedContent Result; }; + struct TEvFutureResolved : public TEventLocal<TEvFutureResolved, EvFutureResolved> { + TEvFutureResolved() {} + }; + struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> { explicit TEvReadStarted(long httpResponseCode) : HttpResponseCode(httpResponseCode) {} const long HttpResponseCode; @@ -525,6 +530,7 @@ struct TRetryStuff { , SizeLimit(sizeLimit) , TxId(txId) , RequestId(requestId) + , RetryState(retryPolicy->CreateRetryState()) , RetryPolicy(retryPolicy) , Cancelled(false) {} @@ -535,8 +541,8 @@ struct TRetryStuff { std::size_t Offset, SizeLimit; const TTxId TxId; const TString RequestId; - const IRetryPolicy<long>::TPtr RetryPolicy; IRetryPolicy<long>::IRetryState::TPtr RetryState; + IRetryPolicy<long>::TPtr RetryPolicy; IHTTPGateway::TCancelHook CancelHook; TMaybe<TDuration> NextRetryDelay; std::atomic_bool Cancelled; @@ -609,12 +615,15 @@ using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std:: class TArrowParquetBatchReader : public IBatchReader<std::shared_ptr<arrow::RecordBatch>> { public: - TArrowParquetBatchReader(std::unique_ptr<parquet::arrow::FileReader>&& fileReader, std::vector<int>&& columnIndices, std::vector<TColumnConverter>&& columnConverters) - : FileReader(std::move(fileReader)) + TArrowParquetBatchReader(TArrowFileDesc&& fileDesc, IArrowReader::TPtr arrowReader, int numRowGroups, std::vector<int>&& columnIndices, std::vector<TColumnConverter>&& columnConverters, std::function<void()>&& onFutureResolve, std::function<void()>&& waitForFutureResolve) + : FileDesc(std::move(fileDesc)) + , ArrowReader(arrowReader) , ColumnIndices(std::move(columnIndices)) , ColumnConverters(std::move(columnConverters)) - , TotalGroups(FileReader->num_row_groups()) + , TotalGroups(numRowGroups) , CurrentGroup(0) + , OnFutureResolve(std::move(onFutureResolve)) + , WaitForFutureResolve(std::move(waitForFutureResolve)) {} bool Next(std::shared_ptr<arrow::RecordBatch>& value) final { @@ -624,7 +633,12 @@ public: } if (!CurrentBatchReader) { - THROW_ARROW_NOT_OK(FileReader->ReadRowGroup(CurrentGroup++, ColumnIndices, &CurrentTable)); + auto future = ArrowReader->ReadRowGroup(FileDesc, CurrentGroup++, ColumnIndices); + future.Subscribe([this](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ + OnFutureResolve(); + }); + WaitForFutureResolve(); + CurrentTable = future.GetValue(); CurrentBatchReader = std::make_unique<arrow::TableBatchReader>(*CurrentTable); } @@ -648,13 +662,16 @@ public: } private: - std::unique_ptr<parquet::arrow::FileReader> FileReader; + TArrowFileDesc FileDesc; + IArrowReader::TPtr ArrowReader; const std::vector<int> ColumnIndices; std::vector<TColumnConverter> ColumnConverters; const int TotalGroups; int CurrentGroup; std::shared_ptr<arrow::Table> CurrentTable; std::unique_ptr<arrow::TableBatchReader> CurrentBatchReader; + std::function<void()> OnFutureResolve; + std::function<void()> WaitForFutureResolve; }; ui64 GetSizeOfData(const arrow::ArrayData& data) { @@ -711,16 +728,16 @@ private: static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; public: - TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, - const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, const std::size_t maxBlocksInFly, + TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, + const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, + const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) - : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), - TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), - PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), + : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), + TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), + PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) {} @@ -730,6 +747,10 @@ public: } } + bool IsDownloadNeeded() const { + return !ReadSpec->Arrow || !ReadSpec->Compression.empty(); + } + bool Next(TString& value) { if (InputFinished) return false; @@ -880,25 +901,66 @@ private: TIssue exceptIssue; bool isLocal = Url.StartsWith("file://"); + bool needWaitFinish = !isLocal; try { - std::unique_ptr<NDB::ReadBuffer> buffer; - if (isLocal) { - buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path); - } else { - buffer = std::make_unique<TReadBufferFromStream>(this); - } - - const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); - YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); if (ReadSpec->Arrow) { - YQL_ENSURE(ReadSpec->Format == "parquet"); - std::unique_ptr<parquet::arrow::FileReader> fileReader; - auto arrowFile = NDB::asArrowFile(decompress ? *decompress : *buffer); + TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format); + if (IsDownloadNeeded()) { + // Read file entirely + std::unique_ptr<NDB::ReadBuffer> buffer; + if (isLocal) { + buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path); + } else { + buffer = std::make_unique<TReadBufferFromStream>(this); + } + + const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); + YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); + auto& readBuffer = decompress ? *decompress : *buffer; + TStringBuilder sb; + TBuffer buff(256_KB); + + while (!readBuffer.eof()) { + if (!readBuffer.hasPendingData()) { + if (!readBuffer.next()) { + break; + } + } + auto bytesReaded = readBuffer.read(buff.data(), 256_KB); + sb.append(buff.data(), buff.data() + bytesReaded); + } - THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(arrowFile, arrow::default_memory_pool(), &fileReader)); - std::shared_ptr<arrow::Schema> schema; - THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema)); + fileDesc.Contents = sb; + } else { + needWaitFinish = false; + } + auto actorSystem = GetActorSystem(); + auto onResolve = [actorSystem, actorId = this->SelfActorId] { + actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved())); + }; + auto waitForResolve = [&] { + auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); + TVector<THolder<IEventBase>> otherEvents; + while (!event->CastAsLocal<TEvPrivate::TEvFutureResolved>()) { + if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) { + throw TS3ReadAbort(); + } + otherEvents.push_back(event->ReleaseBase()); + event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); + } + + for (auto &e: otherEvents) { + Send(SelfActorId, e.Release()); + } + }; + auto future = ArrowReader->GetSchema(fileDesc); + future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) { + onResolve(); + }); + waitForResolve(); + auto result = future.GetValue(); + std::shared_ptr<arrow::Schema> schema = result.Schema; std::vector<int> columnIndices; std::vector<TColumnConverter> columnConverters; for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) { @@ -924,9 +986,25 @@ private: columnIndices.push_back(srcFieldIndex); } - TArrowParquetBatchReader reader(std::move(fileReader), std::move(columnIndices), std::move(columnConverters)); + TArrowParquetBatchReader reader(std::move(fileDesc), + ArrowReader, + result.NumRowGroups, + std::move(columnIndices), + std::move(columnConverters), + onResolve, + waitForResolve); ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal); } else { + std::unique_ptr<NDB::ReadBuffer> buffer; + if (isLocal) { + buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path); + } else { + buffer = std::make_unique<TReadBufferFromStream>(this); + } + + const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); + YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); + auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); TBlockReader reader(std::move(stream)); ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal); @@ -951,7 +1029,7 @@ private: RetryStuff->Cancel(); } - if (!isLocal) { + if (needWaitFinish) { WaitFinish(); } @@ -988,13 +1066,30 @@ private: auto selfActorId = SelfActorId; size_t cntBlocksInFly = 0; if (isLocal) { + auto waitProcessed = [&] { + auto event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); + TVector<THolder<IEventBase>> otherEvents; + while (!event->CastAsLocal<TEvPrivate::TEvBlockProcessed>()) { + if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) { + throw TS3ReadAbort(); + } + otherEvents.push_back(event->ReleaseBase()); + event = WaitForSpecificEvent<TEvPrivate::TEvFutureResolved, TEvPrivate::TEvBlockProcessed, NActors::TEvents::TEvPoison>(); + } + + for (auto& e: otherEvents) { + Send(SelfActorId, e.Release()); + } + }; + for (;;) { T batch; + if (!reader.Next(batch)) { break; } if (++cntBlocksInFly > MaxBlocksInFly) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + waitProcessed(); --cntBlocksInFly; } Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() { @@ -1002,7 +1097,7 @@ private: }, GetIngressDelta())); } while (cntBlocksInFly--) { - WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + waitProcessed(); } } else { for (;;) { @@ -1078,6 +1173,7 @@ private: std::size_t LastOffset = 0; TString LastData; std::size_t MaxBlocksInFly = 2; + IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; bool Paused = false; std::queue<THolder<IEventHandle>> DeferredEvents; @@ -1088,16 +1184,17 @@ private: class TS3ReadCoroActor : public TActorCoro { public: - TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize) + TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, bool isDownloadNeeded, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) , RetryStuff(std::move(retryStuff)) , PathIndex(pathIndex) + , IsDownloadNeeded(isDownloadNeeded) , HttpInflightSize(httpInflightSize) {} private: void Registered(TActorSystem* actorSystem, const TActorId& parent) override { TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself. - if (RetryStuff->Url.substr(0, 6) != "file://") { + if (IsDownloadNeeded && RetryStuff->Url.substr(0, 6) != "file://") { LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex, HttpInflightSize); } @@ -1105,6 +1202,7 @@ private: const TRetryStuff::TPtr RetryStuff; const size_t PathIndex; + const bool IsDownloadNeeded; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; }; @@ -1124,6 +1222,7 @@ public: const NActors::TActorId& computeActorId, const IRetryPolicy<long>::TPtr& retryPolicy, const std::size_t maxBlocksInFly, + IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters @@ -1142,6 +1241,7 @@ public: , ReadSpec(readSpec) , Count(Paths.size()) , MaxBlocksInFly(maxBlocksInFly) + , ArrowReader(arrowReader) , Counters(counters) , TaskCounters(taskCounters) { @@ -1201,8 +1301,8 @@ public: HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream()); } ::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter; - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps); - CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->HttpInflightSize).release())); + auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ArrowReader, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps); + CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->IsDownloadNeeded(), impl->HttpInflightSize).release())); } static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR"; @@ -1486,6 +1586,7 @@ private: std::deque<TReadyBlock> Blocks; ui32 Count; const std::size_t MaxBlocksInFly; + IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; size_t CurrentPathIndex = 0; mutable TInstant LastMemoryReport = TInstant::Now(); @@ -1654,6 +1755,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IRetryPolicy<long>::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, + IArrowReader::TPtr arrowReader, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters) { @@ -1769,7 +1871,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( maxBlocksInFly = FromString<ui64>(it->second); const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, - maxBlocksInFly, cfg, counters, taskCounters); + maxBlocksInFly, arrowReader, cfg, counters, taskCounters); return {actor, actor}; } else { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 528aec20c38..60c227cfece 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -5,6 +5,7 @@ #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> #include <ydb/library/yql/providers/s3/proto/source.pb.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> +#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h> #include <library/cpp/actors/core/actor.h> namespace NYql::NDq { @@ -24,6 +25,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IRetryPolicy<long>::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, + IArrowReader::TPtr arrowReader, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 8339ab757a1..fef83090b94 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -16,14 +16,15 @@ void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway, const IRetryPolicy<long>::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, - ::NMonitoring::TDynamicCounterPtr counters) { + ::NMonitoring::TDynamicCounterPtr counters, + IArrowReader::TPtr arrowReader) { #if defined(_linux_) || defined(_darwin_) NDB::registerFormats(); factory.RegisterSource<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + [credentialsFactory, gateway, retryPolicy, cfg, counters, arrowReader](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, - args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, + args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, arrowReader, counters, args.TaskCounters); }); #else diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 8a61b163961..60b1c0d4ff4 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -6,6 +6,7 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> +#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <util/generic/size_literals.h> @@ -24,6 +25,7 @@ void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), const TS3ReadActorFactoryConfig& = {}, - ::NMonitoring::TDynamicCounterPtr counters = nullptr); + ::NMonitoring::TDynamicCounterPtr counters = nullptr, + IArrowReader::TPtr arrowReader = MakeArrowReader(TArrowReaderSettings())); } diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt index 1716e4591ba..687d9330bdb 100644 --- a/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.darwin.txt @@ -26,6 +26,7 @@ target_link_libraries(providers-s3-compressors PUBLIC contrib-libs-lz4 contrib-libs-lzma contrib-libs-zstd + clickhouse_client_udf ) target_sources(providers-s3-compressors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt index 29593593d83..55350bf2db6 100644 --- a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_link_libraries(providers-s3-compressors PUBLIC contrib-libs-lz4 contrib-libs-lzma contrib-libs-zstd + clickhouse_client_udf ) target_sources(providers-s3-compressors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp diff --git a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt index 29593593d83..55350bf2db6 100644 --- a/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt @@ -27,6 +27,7 @@ target_link_libraries(providers-s3-compressors PUBLIC contrib-libs-lz4 contrib-libs-lzma contrib-libs-zstd + clickhouse_client_udf ) target_sources(providers-s3-compressors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp |