diff options
author | vvvv <vvvv@ydb.tech> | 2023-02-12 03:30:44 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-02-12 03:30:44 +0300 |
commit | fb07d31202e1f701940845b0a4899669ee8f0c28 (patch) | |
tree | 766bef7eadeaefed042dae1a2d36b0f3969f3676 | |
parent | 0c02f9960488f7ba96f1c9bfa432420a9b634f0e (diff) | |
download | ydb-fb07d31202e1f701940845b0a4899669ee8f0c28.tar.gz |
reuse arrow file reader where possible
4 files changed, 65 insertions, 15 deletions
diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp index e07a47b5aab..5ba0b2960c4 100644 --- a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp +++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp @@ -140,6 +140,14 @@ public: std::shared_ptr<arrow::io::RandomAccessFile> ArrowFile; }; +struct TArrowFileCookie { + TFileReaderWrapper Wrapper; + + TArrowFileCookie(const TArrowFileDesc& desc) + : Wrapper(desc) + {} +}; + NThreading::TFuture<IArrowReader::TSchemaResponse> TArrowReader::GetSchema(const TArrowFileDesc& desc) const { auto promise = NThreading::NewPromise<TSchemaResponse>(); @@ -150,13 +158,16 @@ NThreading::TFuture<IArrowReader::TSchemaResponse> TArrowReader::GetSchema(const if (!ThreadPool->AddFunc([desc, promise] () mutable { YQL_ENSURE(desc.Format == "parquet"); try { - TFileReaderWrapper wrapper(desc); + auto cookie = desc.Cookie; + if (!cookie) { + cookie = std::make_shared<TArrowFileCookie>(desc); + } std::shared_ptr<arrow::Schema> schema; - THROW_ARROW_NOT_OK(wrapper.FileReader->GetSchema(&schema)); + THROW_ARROW_NOT_OK(cookie->Wrapper.FileReader->GetSchema(&schema)); - promise.SetValue(TSchemaResponse(schema, wrapper.FileReader->num_row_groups())); + promise.SetValue(TSchemaResponse(schema, cookie->Wrapper.FileReader->num_row_groups(), cookie)); } catch (const std::exception&) { promise.SetException(std::current_exception()); } @@ -177,11 +188,14 @@ NThreading::TFuture<std::shared_ptr<arrow::Table>> TArrowReader::ReadRowGroup(co if (!ThreadPool->AddFunc([desc, promise, rowGroupIndex, columnIndices] () mutable { YQL_ENSURE(desc.Format == "parquet"); try { - TFileReaderWrapper wrapper(desc); + auto cookie = desc.Cookie; + if (!cookie) { + cookie = std::make_shared<TArrowFileCookie>(desc); + } std::shared_ptr<arrow::Table> currentTable; - THROW_ARROW_NOT_OK(wrapper.FileReader->ReadRowGroup(rowGroupIndex, columnIndices, ¤tTable)); + THROW_ARROW_NOT_OK(cookie->Wrapper.FileReader->ReadRowGroup(rowGroupIndex, columnIndices, ¤tTable)); promise.SetValue(currentTable); } catch (const std::exception&) { diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp index beae3d57b52..68d75e34e37 100644 --- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp +++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp @@ -1,20 +1,38 @@ #include "arrow_reader.h" namespace NYql { -TArrowFileDesc::TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format) - : Url(url), Gateway(gateway), Headers(headers), RetryPolicy(retryPolicy), Format(format), Size(size), IsLocal(url.StartsWith("file://")) +TArrowFileDesc::TArrowFileDesc( + const TString& url, + IHTTPGateway::TPtr gateway, + IHTTPGateway::THeaders headers, + const IRetryPolicy<long>::TPtr& retryPolicy, + size_t size, + const TString& format +) + : Url(url) + , Gateway(gateway) + , Headers(headers) + , RetryPolicy(retryPolicy) + , Format(format) + , Size(size) + , IsLocal(url.StartsWith("file://")) { - } -IArrowReader::TSchemaResponse::TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups) : Schema(schema), NumRowGroups(numRowGroups) +IArrowReader::TSchemaResponse::TSchemaResponse( + std::shared_ptr<arrow::Schema> schema, + int numRowGroups, + std::shared_ptr<TArrowFileCookie> cookie +) + : Schema(schema) + , NumRowGroups(numRowGroups) + , Cookie(cookie) { - } -TArrowReaderSettings::TArrowReaderSettings(size_t poolSize) : PoolSize(poolSize) +TArrowReaderSettings::TArrowReaderSettings(size_t poolSize) + : PoolSize(poolSize) { - } } diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h index 4fb283377a4..556d584962d 100644 --- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h +++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h @@ -9,9 +9,20 @@ #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> namespace NYql { + +struct TArrowFileCookie; + class TArrowFileDesc { public: - TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format = ""); + TArrowFileDesc( + const TString& url, + IHTTPGateway::TPtr gateway, + IHTTPGateway::THeaders headers, + const IRetryPolicy<long>::TPtr& retryPolicy, + size_t size, + const TString& format = "" + ); + TString Url; IHTTPGateway::TPtr Gateway; IHTTPGateway::THeaders Headers; @@ -20,6 +31,7 @@ public: size_t Size; bool IsLocal; TMaybe<TString> Contents; + std::shared_ptr<TArrowFileCookie> Cookie; }; class TArrowReaderSettings { @@ -34,13 +46,18 @@ public: class TSchemaResponse { public: - TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups); + TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups, std::shared_ptr<TArrowFileCookie> cookie); std::shared_ptr<arrow::Schema> Schema; int NumRowGroups; + std::shared_ptr<TArrowFileCookie> Cookie; }; virtual NThreading::TFuture<TSchemaResponse> GetSchema(const TArrowFileDesc& desc) const = 0; - virtual NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, const std::vector<int>& columnIndices) const = 0; + virtual NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup( + const TArrowFileDesc& desc, + int rowGroupIndex, + const std::vector<int>& columnIndices + ) const = 0; virtual ~IArrowReader() = default; }; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 1171dcab809..6ea49592220 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -986,6 +986,7 @@ private: columnIndices.push_back(srcFieldIndex); } + fileDesc.Cookie = result.Cookie; TArrowParquetBatchReader reader(std::move(fileDesc), ArrowReader, result.NumRowGroups, |