aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-02-12 03:30:44 +0300
committervvvv <vvvv@ydb.tech>2023-02-12 03:30:44 +0300
commitfb07d31202e1f701940845b0a4899669ee8f0c28 (patch)
tree766bef7eadeaefed042dae1a2d36b0f3969f3676
parent0c02f9960488f7ba96f1c9bfa432420a9b634f0e (diff)
downloadydb-fb07d31202e1f701940845b0a4899669ee8f0c28.tar.gz
reuse arrow file reader where possible
-rw-r--r--ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp24
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp32
-rw-r--r--ydb/library/yql/providers/common/arrow/interface/arrow_reader.h23
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp1
4 files changed, 65 insertions, 15 deletions
diff --git a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
index e07a47b5aab..5ba0b2960c4 100644
--- a/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
+++ b/ydb/library/yql/providers/common/arrow/arrow_reader_impl.cpp
@@ -140,6 +140,14 @@ public:
std::shared_ptr<arrow::io::RandomAccessFile> ArrowFile;
};
+struct TArrowFileCookie {
+ TFileReaderWrapper Wrapper;
+
+ TArrowFileCookie(const TArrowFileDesc& desc)
+ : Wrapper(desc)
+ {}
+};
+
NThreading::TFuture<IArrowReader::TSchemaResponse> TArrowReader::GetSchema(const TArrowFileDesc& desc) const
{
auto promise = NThreading::NewPromise<TSchemaResponse>();
@@ -150,13 +158,16 @@ NThreading::TFuture<IArrowReader::TSchemaResponse> TArrowReader::GetSchema(const
if (!ThreadPool->AddFunc([desc, promise] () mutable {
YQL_ENSURE(desc.Format == "parquet");
try {
- TFileReaderWrapper wrapper(desc);
+ auto cookie = desc.Cookie;
+ if (!cookie) {
+ cookie = std::make_shared<TArrowFileCookie>(desc);
+ }
std::shared_ptr<arrow::Schema> schema;
- THROW_ARROW_NOT_OK(wrapper.FileReader->GetSchema(&schema));
+ THROW_ARROW_NOT_OK(cookie->Wrapper.FileReader->GetSchema(&schema));
- promise.SetValue(TSchemaResponse(schema, wrapper.FileReader->num_row_groups()));
+ promise.SetValue(TSchemaResponse(schema, cookie->Wrapper.FileReader->num_row_groups(), cookie));
} catch (const std::exception&) {
promise.SetException(std::current_exception());
}
@@ -177,11 +188,14 @@ NThreading::TFuture<std::shared_ptr<arrow::Table>> TArrowReader::ReadRowGroup(co
if (!ThreadPool->AddFunc([desc, promise, rowGroupIndex, columnIndices] () mutable {
YQL_ENSURE(desc.Format == "parquet");
try {
- TFileReaderWrapper wrapper(desc);
+ auto cookie = desc.Cookie;
+ if (!cookie) {
+ cookie = std::make_shared<TArrowFileCookie>(desc);
+ }
std::shared_ptr<arrow::Table> currentTable;
- THROW_ARROW_NOT_OK(wrapper.FileReader->ReadRowGroup(rowGroupIndex, columnIndices, &currentTable));
+ THROW_ARROW_NOT_OK(cookie->Wrapper.FileReader->ReadRowGroup(rowGroupIndex, columnIndices, &currentTable));
promise.SetValue(currentTable);
} catch (const std::exception&) {
diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
index beae3d57b52..68d75e34e37 100644
--- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
+++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.cpp
@@ -1,20 +1,38 @@
#include "arrow_reader.h"
namespace NYql {
-TArrowFileDesc::TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format)
- : Url(url), Gateway(gateway), Headers(headers), RetryPolicy(retryPolicy), Format(format), Size(size), IsLocal(url.StartsWith("file://"))
+TArrowFileDesc::TArrowFileDesc(
+ const TString& url,
+ IHTTPGateway::TPtr gateway,
+ IHTTPGateway::THeaders headers,
+ const IRetryPolicy<long>::TPtr& retryPolicy,
+ size_t size,
+ const TString& format
+)
+ : Url(url)
+ , Gateway(gateway)
+ , Headers(headers)
+ , RetryPolicy(retryPolicy)
+ , Format(format)
+ , Size(size)
+ , IsLocal(url.StartsWith("file://"))
{
-
}
-IArrowReader::TSchemaResponse::TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups) : Schema(schema), NumRowGroups(numRowGroups)
+IArrowReader::TSchemaResponse::TSchemaResponse(
+ std::shared_ptr<arrow::Schema> schema,
+ int numRowGroups,
+ std::shared_ptr<TArrowFileCookie> cookie
+)
+ : Schema(schema)
+ , NumRowGroups(numRowGroups)
+ , Cookie(cookie)
{
-
}
-TArrowReaderSettings::TArrowReaderSettings(size_t poolSize) : PoolSize(poolSize)
+TArrowReaderSettings::TArrowReaderSettings(size_t poolSize)
+ : PoolSize(poolSize)
{
-
}
}
diff --git a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
index 4fb283377a4..556d584962d 100644
--- a/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
+++ b/ydb/library/yql/providers/common/arrow/interface/arrow_reader.h
@@ -9,9 +9,20 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
namespace NYql {
+
+struct TArrowFileCookie;
+
class TArrowFileDesc {
public:
- TArrowFileDesc(const TString& url, IHTTPGateway::TPtr gateway, IHTTPGateway::THeaders headers, const IRetryPolicy<long>::TPtr& retryPolicy, size_t size, const TString& format = "");
+ TArrowFileDesc(
+ const TString& url,
+ IHTTPGateway::TPtr gateway,
+ IHTTPGateway::THeaders headers,
+ const IRetryPolicy<long>::TPtr& retryPolicy,
+ size_t size,
+ const TString& format = ""
+ );
+
TString Url;
IHTTPGateway::TPtr Gateway;
IHTTPGateway::THeaders Headers;
@@ -20,6 +31,7 @@ public:
size_t Size;
bool IsLocal;
TMaybe<TString> Contents;
+ std::shared_ptr<TArrowFileCookie> Cookie;
};
class TArrowReaderSettings {
@@ -34,13 +46,18 @@ public:
class TSchemaResponse {
public:
- TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups);
+ TSchemaResponse(std::shared_ptr<arrow::Schema> schema, int numRowGroups, std::shared_ptr<TArrowFileCookie> cookie);
std::shared_ptr<arrow::Schema> Schema;
int NumRowGroups;
+ std::shared_ptr<TArrowFileCookie> Cookie;
};
virtual NThreading::TFuture<TSchemaResponse> GetSchema(const TArrowFileDesc& desc) const = 0;
- virtual NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(const TArrowFileDesc& desc, int rowGroupIndex, const std::vector<int>& columnIndices) const = 0;
+ virtual NThreading::TFuture<std::shared_ptr<arrow::Table>> ReadRowGroup(
+ const TArrowFileDesc& desc,
+ int rowGroupIndex,
+ const std::vector<int>& columnIndices
+ ) const = 0;
virtual ~IArrowReader() = default;
};
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 1171dcab809..6ea49592220 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -986,6 +986,7 @@ private:
columnIndices.push_back(srcFieldIndex);
}
+ fileDesc.Cookie = result.Cookie;
TArrowParquetBatchReader reader(std::move(fileDesc),
ArrowReader,
result.NumRowGroups,