aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-03-01 20:20:07 +0300
committerhor911 <hor911@ydb.tech>2023-03-01 20:20:07 +0300
commit4217c0e360c22b33763317089141ab196386c21b (patch)
treed72e4f9baa074d45bcf9752bfcf94d2c3f63b353
parent23f0afc1966496d0937ee30f2aba19e77f93d17f (diff)
downloadydb-4217c0e360c22b33763317089141ab196386c21b.tar.gz
Cororead refactored
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp7
-rw-r--r--ydb/core/yq/libs/config/protos/read_actors_factory.proto1
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h4
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp10
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp983
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json5
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp7
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
13 files changed, 572 insertions, 455 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 296418df1f..714a696f2c 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -1661,6 +1661,10 @@ private:
*gatewaysConfig.MutableS3() = Params.Config.GetGateways().GetS3();
gatewaysConfig.MutableS3()->ClearClusterMapping();
+ auto* attr = gatewaysConfig.MutableS3()->MutableDefaultSettings()->Add();
+ attr->SetName("ArrowThreadPool");
+ attr->SetValue("false");
+
THashMap<TString, TString> clusters;
TString monitoringEndpoint = Params.Config.GetCommon().GetMonitoringEndpoint();
@@ -1698,7 +1702,8 @@ private:
}
{
- dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory));
+ dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
+ Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles()));
}
{
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
index 013fbaeb84..3fa3fdcd15 100644
--- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto
+++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
@@ -13,6 +13,7 @@ message TS3ReadActorFactoryConfig {
uint64 RowsInBatch = 2; // Default = 1000
uint64 MaxInflight = 3; // Default = 20
uint64 DataInflight = 4; // Default = 200 MB
+ bool AllowLocalFiles = 5;
}
message TPqReadActorFactoryConfig {
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 049a77b401..f90db4d998 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -164,7 +164,7 @@ void Init(
if (protoConfig.GetPrivateApi().GetEnabled()) {
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
- auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(s3readConfig.GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one
+ auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::Max());
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg;
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 8dc0d00ae0..169815a33b 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1627,7 +1627,7 @@ protected:
Y_VERIFY(!TaskRunner || info.Buffer);
if (info.Finished) {
- CA_LOG_D("Skip polling async input[" << inputIndex << "]: finished");
+ CA_LOG_T("Skip polling async input[" << inputIndex << "]: finished");
return;
}
@@ -1670,7 +1670,7 @@ protected:
AsyncInputPush(std::move(batch), info, space, finished);
} else {
- CA_LOG_D("Skip polling async input[" << inputIndex << "]: no free space: " << freeSpace);
+ CA_LOG_T("Skip polling async input[" << inputIndex << "]: no free space: " << freeSpace);
ContinueExecute(); // If there is no free space in buffer, => we have something to process
}
}
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 29a05254a6..6dba58a1d8 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -411,16 +411,18 @@ private:
OnFinish(TIssues{error});
}
- void MaybeStart() {
+ void MaybeStart(long httpResponseCode = 0) {
if (!HttpResponseCode) {
- curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &HttpResponseCode);
+ if (!httpResponseCode) {
+ curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
+ }
+ HttpResponseCode = httpResponseCode;
OnStart(HttpResponseCode);
}
}
void Done(CURLcode result, long httpResponseCode) final {
- HttpResponseCode = httpResponseCode;
- OnStart(HttpResponseCode);
+ MaybeStart(httpResponseCode);
if (CURLE_OK != result) {
return Fail(TIssue(TStringBuilder{} << "error: " << curl_easy_strerror(result) << " detailed: " << GetDetailedErrorText()));
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index af9ebb83be..1951d4ec1b 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -26,6 +26,7 @@
#include <arrow/io/api.h>
#include <arrow/compute/cast.h>
#include <arrow/status.h>
+#include <arrow/util/future.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
@@ -68,6 +69,7 @@
#include <util/generic/size_literals.h>
#include <util/stream/format.h>
+#include <util/system/fstat.h>
#include <queue>
@@ -80,24 +82,29 @@
#define LOG_E(name, stream) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
#define LOG_W(name, stream) \
- LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
+ LOG_WARN_S (*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
#define LOG_I(name, stream) \
- LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
+ LOG_INFO_S (*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
#define LOG_D(name, stream) \
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
#define LOG_T(name, stream) \
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream)
-#define LOG_CORO_E(name, stream) \
- LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
-#define LOG_CORO_W(name, stream) \
- LOG_WARN_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
-#define LOG_CORO_I(name, stream) \
- LOG_INFO_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
-#define LOG_CORO_D(name, stream) \
- LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
-#define LOG_CORO_T(name, stream) \
- LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream)
+#define LOG_CORO_E(stream) \
+ LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
+ << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
+#define LOG_CORO_W(stream) \
+ LOG_WARN_S (GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
+ << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
+#define LOG_CORO_I(stream) \
+ LOG_INFO_S (GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
+ << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
+#define LOG_CORO_D(stream) \
+ LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
+ << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
+#define LOG_CORO_T(stream) \
+ LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \
+ << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream)
#define THROW_ARROW_NOT_OK(status) \
do \
@@ -155,6 +162,7 @@ struct TEvPrivate {
EvFutureResolved,
EvObjectPathBatch,
EvObjectPathReadError,
+ EvReadResult2,
EvEnd
};
@@ -271,6 +279,15 @@ struct TEvPrivate {
TIssues Issues;
TEvObjectPathReadError(TIssues issues) : Issues(std::move(issues)) { }
};
+
+ struct TEvReadResult2 : public TEventLocal<TEvReadResult2, EvReadResult2> {
+ TEvReadResult2(IHTTPGateway::TContent&& result) : Failure(false), Result(std::move(result)) { }
+ TEvReadResult2(TIssues&& issues) : Failure(true), Result(""), Issues(std::move(issues)) { }
+ const bool Failure;
+ IHTTPGateway::TContent Result;
+ const TIssues Issues;
+ };
+
};
using namespace NKikimr::NMiniKQL;
@@ -976,6 +993,7 @@ struct TReadSpec {
using TPtr = std::shared_ptr<TReadSpec>;
bool Arrow = false;
+ bool ThreadPool = false;
std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec;
NDB::ColumnsWithTypeAndName CHColumns;
std::shared_ptr<arrow::Schema> ArrowSchema;
@@ -1004,7 +1022,6 @@ struct TRetryStuff {
, SizeLimit(sizeLimit)
, TxId(txId)
, RequestId(requestId)
- , RetryState(retryPolicy->CreateRetryState())
, RetryPolicy(retryPolicy)
, Cancelled(false)
{}
@@ -1062,92 +1079,8 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste
inflightCounter);
}
-template <typename T>
-class IBatchReader {
-public:
- virtual ~IBatchReader() = default;
-
- virtual bool Next(T& value) = 0;
-};
-
-class TBlockReader : public IBatchReader<NDB::Block> {
-public:
- TBlockReader(std::unique_ptr<NDB::IBlockInputStream>&& stream)
- : Stream(std::move(stream))
- {}
-
- bool Next(NDB::Block& value) final {
- value = Stream->read();
- return !!value;
- }
-
-private:
- std::unique_ptr<NDB::IBlockInputStream> Stream;
-};
-
using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>;
-class TArrowParquetBatchReader : public IBatchReader<std::shared_ptr<arrow::RecordBatch>> {
-public:
- TArrowParquetBatchReader(TArrowFileDesc&& fileDesc, IArrowReader::TPtr arrowReader, int numRowGroups, std::vector<int>&& columnIndices, std::vector<TColumnConverter>&& columnConverters, std::function<void()>&& onFutureResolve, std::function<void()>&& waitForFutureResolve)
- : FileDesc(std::move(fileDesc))
- , ArrowReader(arrowReader)
- , ColumnIndices(std::move(columnIndices))
- , ColumnConverters(std::move(columnConverters))
- , TotalGroups(numRowGroups)
- , CurrentGroup(0)
- , OnFutureResolve(std::move(onFutureResolve))
- , WaitForFutureResolve(std::move(waitForFutureResolve))
- {}
-
- bool Next(std::shared_ptr<arrow::RecordBatch>& value) final {
- for (;;) {
- if (CurrentGroup == TotalGroups) {
- return false;
- }
-
- if (!CurrentBatchReader) {
- auto future = ArrowReader->ReadRowGroup(FileDesc, CurrentGroup++, ColumnIndices);
- future.Subscribe([f = OnFutureResolve](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
- f();
- });
- WaitForFutureResolve();
- CurrentTable = future.GetValue();
- CurrentBatchReader = std::make_unique<arrow::TableBatchReader>(*CurrentTable);
- }
-
- THROW_ARROW_NOT_OK(CurrentBatchReader->ReadNext(&value));
- if (value) {
- auto columns = value->columns();
- for (size_t i = 0; i < ColumnConverters.size(); ++i) {
- auto converter = ColumnConverters[i];
- if (converter) {
- columns[i] = converter(columns[i]);
- }
- }
-
- value = arrow::RecordBatch::Make(value->schema(), value->num_rows(), columns);
- return true;
- }
-
- CurrentBatchReader = nullptr;
- CurrentTable = nullptr;
- }
- }
-
-private:
- TArrowFileDesc FileDesc;
- IArrowReader::TPtr ArrowReader;
- const std::vector<int> ColumnIndices;
- std::vector<TColumnConverter> ColumnConverters;
- const int TotalGroups;
- int CurrentGroup;
- std::shared_ptr<arrow::Table> CurrentTable;
- std::unique_ptr<arrow::TableBatchReader> CurrentBatchReader;
- std::function<void()> OnFutureResolve;
- std::function<void()> WaitForFutureResolve;
-};
-
template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
@@ -1173,7 +1106,7 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::
}
TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) {
- if (originalType->id() == arrow::Type::DATE32) {
+if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
@@ -1199,6 +1132,7 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std::
YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: "
<< targetType->ToString() << ", got: " << originalType->ToString());
+
return [targetType](const std::shared_ptr<arrow::Array>& value) {
auto res = arrow::compute::Cast(*value, targetType);
THROW_ARROW_NOT_OK(res.status());
@@ -1206,341 +1140,437 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std::
};
}
+std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::RecordBatch> batch, std::vector<TColumnConverter>& columnConverters) {
+ auto columns = batch->columns();
+ for (size_t i = 0; i < columnConverters.size(); ++i) {
+ auto converter = columnConverters[i];
+ if (converter) {
+ columns[i] = converter(columns[i]);
+ }
+ }
+ return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
+}
+
class TS3ReadCoroImpl : public TActorCoroImpl {
friend class TS3StreamReadActor;
-private:
- class TReadBufferFromStream : public NDB::ReadBuffer {
+
+public:
+
+ class THttpRandomAccessFile : public arrow::io::RandomAccessFile {
public:
- TReadBufferFromStream(TS3ReadCoroImpl* coro)
- : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro)
- {}
+ THttpRandomAccessFile(TS3ReadCoroImpl* coro, size_t fileSize) : Coro(coro), FileSize(fileSize) {
+ }
+
+ // has no meaning and use
+ arrow::Result<int64_t> GetSize() override { return FileSize; }
+ arrow::Result<int64_t> Tell() const override { return InnerPos; }
+ arrow::Status Seek(int64_t position) override { InnerPos = position; return {}; }
+ arrow::Status Close() override { return {}; }
+ bool closed() const override { return false; }
+ // must not be used currently
+ arrow::Result<int64_t> Read(int64_t, void*) override {
+ Y_VERIFY(0);
+ return arrow::Result<int64_t>();
+ }
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override {
+ Y_VERIFY(0);
+ return arrow::Result<std::shared_ptr<arrow::Buffer>>();
+ }
+ arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t, int64_t) override {
+ Y_VERIFY(0);
+ return arrow::Future<std::shared_ptr<arrow::Buffer>>();
+ }
+ // the only useful
+ arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
+ return Coro->ReadAt(position, nbytes);
+ }
+ // future work
+ arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>&) override { return{}; }
+
+ private:
+ TS3ReadCoroImpl *const Coro;
+ const size_t FileSize;
+ int64_t InnerPos = 0;
+ };
+
+ class TRandomAccessFileTrafficCounter : public arrow::io::RandomAccessFile {
+ public:
+ TRandomAccessFileTrafficCounter(TS3ReadCoroImpl* coro, std::shared_ptr<arrow::io::RandomAccessFile> impl)
+ : Coro(coro), Impl(impl) {
+ }
+ arrow::Result<int64_t> GetSize() override { return Impl->GetSize(); }
+ virtual arrow::Result<int64_t> Tell() const override { return Impl->Tell(); }
+ virtual arrow::Status Seek(int64_t position) override { return Impl->Seek(position); }
+ virtual arrow::Status Close() override { return Impl->Close(); }
+ virtual bool closed() const override { return Impl->closed(); }
+ arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override { return Impl->Read(nbytes, buffer); }
+ arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override { return Impl->Read(nbytes); }
+ arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
+ auto result = Impl->ReadAt(position, nbytes);
+ Coro->IngressBytes += nbytes;
+ return result;
+ }
+ arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& ranges) override { return Impl->WillNeed(ranges); }
+
+ private:
+ TS3ReadCoroImpl *const Coro;
+ std::shared_ptr<arrow::io::RandomAccessFile> Impl;
+ };
+
+ class TCoroReadBuffer : public NDB::ReadBuffer {
+ public:
+ TCoroReadBuffer(TS3ReadCoroImpl* coro)
+ : NDB::ReadBuffer(nullptr, 0ULL)
+ , Coro(coro)
+ { }
private:
bool nextImpl() final {
- while (Coro->Next(Value)) {
- if (!Value.empty()) {
- working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size());
+ while (!Coro->InputFinished || !Coro->DeferredDataParts.empty()) {
+ Coro->ProcessOneEvent();
+ if (Coro->InputBuffer) {
+ WorkingBuffer.swap(Coro->InputBuffer);
+ Coro->InputBuffer.clear();
+ auto rawData = const_cast<char*>(WorkingBuffer.data());
+ working_buffer = NDB::BufferBase::Buffer(rawData, rawData + WorkingBuffer.size());
return true;
}
}
return false;
}
-
TS3ReadCoroImpl *const Coro;
- TString Value;
+ TString WorkingBuffer;
};
- static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
-public:
- TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
- const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader,
- const TS3ReadActorFactoryConfig& readActorFactoryCfg,
- const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
- const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
- const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
- : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
- TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
- PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader),
- DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
- {}
+ void RunClickHouseParserOverHttp() {
- ~TS3ReadCoroImpl() override {
- if (DeferredEvents.size() && DeferredQueueSize) {
- DeferredQueueSize->Sub(DeferredEvents.size());
+ LOG_CORO_D("RunClickHouseParserOverHttp");
+
+ std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
+ std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
+ NDB::ReadBuffer* buffer = coroBuffer.get();
+
+ if (ReadSpec->Compression) {
+ decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
+ YQL_ENSURE(buffer, "Unsupported " << ReadSpec->Compression << " compression.");
+ buffer = decompressorBuffer.get();
}
- }
- bool IsDownloadNeeded() const {
- return !ReadSpec->Arrow || !ReadSpec->Compression.empty();
+ DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize);
+
+ auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(
+ NDB::FormatFactory::instance().getInputFormat(
+ ReadSpec->Format, *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings
+ )
+ );
+
+ while (NDB::Block batch = stream->read()) {
+ Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta()));
+ }
+
+ LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED");
}
- bool Next(TString& value) {
- if (InputFinished)
- return false;
+ void RunClickHouseParserOverFile() {
- if (Paused || DeferredEvents.empty()) {
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted
- , TEvPrivate::TEvDataPart
- , TEvPrivate::TEvReadFinished
- , TEvPrivate::TEvPause
- , TEvPrivate::TEvContinue
- , NActors::TEvents::TEvPoison>();
+ LOG_CORO_D("RunClickHouseParserOverFile");
- switch (const auto etype = ev->GetTypeRewrite()) {
- case TEvPrivate::TEvPause::EventType:
- Paused = true;
- break;
- case TEvPrivate::TEvContinue::EventType:
- Paused = false;
- break;
- case NActors::TEvents::TEvPoison::EventType:
- RetryStuff->Cancel();
- throw TS3ReadAbort();
- default:
- DeferredEvents.push(std::move(ev));
- if (DeferredQueueSize) {
- DeferredQueueSize->Inc();
- }
- break;
- }
+ TString fileName = Url.substr(7) + Path;
+
+ std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<NDB::ReadBufferFromFile>(fileName);
+ std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
+ NDB::ReadBuffer* buffer = coroBuffer.get();
+
+ if (ReadSpec->Compression) {
+ decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
+ YQL_ENSURE(buffer, "Unsupported " << ReadSpec->Compression << " compression.");
+ buffer = decompressorBuffer.get();
}
- if (Paused || DeferredEvents.empty()) {
- value.clear();
- return true;
+ auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(
+ NDB::FormatFactory::instance().getInputFormat(
+ ReadSpec->Format, *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings
+ )
+ );
+
+ auto actorSystem = GetActorSystem();
+ auto selfId = SelfActorId;
+ size_t cntBlocksInFly = 0;
+
+ while (NDB::Block batch = stream->read()) {
+ if (++cntBlocksInFly > MaxBlocksInFly) {
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ --cntBlocksInFly;
+ }
+ Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() {
+ actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed()));
+ }, TakeIngressDelta()));
}
+ while (cntBlocksInFly--) {
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ }
+
+ IngressBytes += GetFileLength(fileName);
- THolder<IEventHandle> ev;
- ev.Swap(DeferredEvents.front());
- DeferredEvents.pop();
- if (DeferredQueueSize) {
- DeferredQueueSize->Dec();
+ LOG_CORO_D("RunClickHouseParserOverFile FINISHED");
+ }
+
+ void BuildColumnConverters(std::shared_ptr<arrow::Schema> outputSchema, std::shared_ptr<arrow::Schema> dataSchema,
+ std::vector<int>& columnIndices, std::vector<TColumnConverter>& columnConverters) {
+ columnConverters.reserve(outputSchema->num_fields());
+ for (int i = 0; i < outputSchema->num_fields(); ++i) {
+ const auto& targetField = outputSchema->field(i);
+ auto srcFieldIndex = dataSchema->GetFieldIndex(targetField->name());
+ YQL_ENSURE(srcFieldIndex != -1, "Missing field: " << targetField->name());
+ auto targetType = targetField->type();
+ auto originalType = dataSchema->field(srcFieldIndex)->type();
+ YQL_ENSURE(!originalType->layout().has_dictionary, "Unsupported dictionary encoding is used for field: "
+ << targetField->name() << ", type: " << originalType->ToString());
+ columnIndices.push_back(srcFieldIndex);
+ auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
+ YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
+ columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
}
+ }
- switch (const auto etype = ev->GetTypeRewrite()) {
- case TEvPrivate::TEvReadStarted::EventType:
- ErrorText.clear();
- Issues.Clear();
- value.clear();
- RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode);
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", Http code: " << HttpResponseCode << ", retry after: " << RetryStuff->NextRetryDelay << ", request id: [" << RetryStuff->RequestId << "]");
- if (!RetryStuff->NextRetryDelay) { // Success or not retryable
- RetryStuff->RetryState = nullptr;
- }
- return true;
- case TEvPrivate::TEvReadFinished::EventType:
- Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
+ void RunThreadPoolBlockArrowParser() {
- if (HttpResponseCode >= 300) {
- ServerReturnedError = true;
- Issues.AddIssue(TIssue{TStringBuilder() << "HTTP error code: " << HttpResponseCode});
- }
+ LOG_CORO_D("RunThreadPoolBlockArrowParser");
- if (Issues) {
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished. Url: " << RetryStuff->Url << ". Issues: " << Issues.ToOneLineString());
- if (!RetryStuff->NextRetryDelay) {
- InputFinished = true;
- LOG_CORO_W("TS3ReadCoroImpl", "ReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");
- throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
- }
- }
+ TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format);
- if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) {
- LOG_CORO_D("TS3ReadCoroImpl", "Retry Download in " << RetryStuff->NextRetryDelay << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "], Issues: " << Issues.ToOneLineString());
- GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize))));
- value.clear();
- } else {
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", request id: [" << RetryStuff->RequestId << "]");
- InputFinished = true;
- if (ServerReturnedError) {
- throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
- }
- return false; // end of data (real data, not an error)
- }
- return true;
- case TEvPrivate::TEvDataPart::EventType:
- if (HttpDataRps) {
- HttpDataRps->Inc();
- }
- if (200L == HttpResponseCode || 206L == HttpResponseCode) {
- value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
- IngressBytes += value.size();
- RetryStuff->Offset += value.size();
- RetryStuff->SizeLimit -= value.size();
- LastOffset = RetryStuff->Offset;
- LastData = value;
- LOG_CORO_T("TS3ReadCoroImpl", "TEvDataPart, size: " << value.size() << ", Url: " << RetryStuff->Url << ", Offset (updated): " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]");
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
- } else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) {
- ServerReturnedError = true;
- if (ErrorText.size() < 256_KB)
- ErrorText.append(ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract());
- else if (!ErrorText.EndsWith(TruncatedSuffix))
- ErrorText.append(TruncatedSuffix);
- value.clear();
- LOG_CORO_W("TS3ReadCoroImpl", "TEvDataPart, ERROR: " << ErrorText << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]");
- }
- return true;
- default:
- return false;
+ auto actorSystem = GetActorSystem();
+ auto selfId = SelfActorId;
+
+ auto future = ArrowReader->GetSchema(fileDesc);
+ future.Subscribe([actorSystem, selfId](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
+ actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved()));
+ });
+ WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>();
+ auto result = future.GetValue();
+
+ fileDesc.Cookie = result.Cookie;
+
+ std::shared_ptr<arrow::Schema> schema = result.Schema;
+ std::vector<int> columnIndices;
+ std::vector<TColumnConverter> columnConverters;
+
+ BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
+
+ for (int group = 0; group < result.NumRowGroups; group++) {
+
+ auto future = ArrowReader->ReadRowGroup(fileDesc, group, columnIndices);
+ future.Subscribe([actorSystem, selfId](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
+ actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved()));
+ });
+ WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>();
+ auto table = future.GetValue();
+ auto reader = std::make_unique<arrow::TableBatchReader>(*table);
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ ::arrow::Status status;
+ while (status = reader->ReadNext(&batch), status.ok() && batch) {
+ Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
+ ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta()
+ ));
+ }
+ if (!status.ok()) {
+ throw yexception() << status.ToString();
+ }
}
+
+ LOG_CORO_D("RunThreadPoolBlockArrowParser FINISHED");
}
-private:
- ui64 GetIngressDelta() {
- auto currentIngressBytes = IngressBytes;
- IngressBytes = 0;
- return currentIngressBytes;
+
+ arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) {
+ RetryStuff->Gateway->Download(Url + Path, RetryStuff->Headers,
+ position,
+ nbytes,
+ std::bind(&OnResult, GetActorSystem(), SelfActorId, std::placeholders::_1),
+ {},
+ RetryStuff->RetryPolicy);
+ auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>();
+ if (ev->Get()->Failure) {
+ throw yexception() << ev->Get()->Issues.ToOneLineString();
+ } else {
+ auto result = ev->Get()->Result.Extract();
+ IngressBytes += nbytes;
+ return arrow::Buffer::FromString(result);
+ }
}
- void WaitFinish() {
- LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish: " << Path);
- if (InputFinished)
+ static void OnResult(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) {
+ switch (result.index()) {
+ case 0U:
+ actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvReadResult2(std::get<IHTTPGateway::TContent>(std::move(result)))));
return;
-
- while (true) {
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted
- , TEvPrivate::TEvDataPart
- , TEvPrivate::TEvReadFinished
- , TEvPrivate::TEvPause
- , TEvPrivate::TEvContinue
- , NActors::TEvents::TEvPoison>();
-
- const auto etype = ev->GetTypeRewrite();
- switch (etype) {
- case TEvPrivate::TEvReadFinished::EventType:
- Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues);
- LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Path << " " << Issues.ToOneLineString());
- break;
- case NActors::TEvents::TEvPoison::EventType:
- RetryStuff->Cancel();
- throw TS3ReadAbort();
- default:
- continue;
- }
- InputFinished = true;
+ case 1U:
+ actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvReadResult2(std::get<TIssues>(std::move(result)))));
return;
+ default:
+ break;
}
}
- template<typename EvType>
- void WaitEvent() {
- auto event = WaitForEvent();
- TVector<THolder<IEventBase>> otherEvents;
- while (!event->CastAsLocal<EvType>()) {
- if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) {
- throw TS3ReadAbort();
+ void RunCoroBlockArrowParserOverHttp() {
+
+ LOG_CORO_D("RunCoroBlockArrowParserOverHttp");
+
+ std::shared_ptr<arrow::io::RandomAccessFile> arrowFile = std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit);
+ std::unique_ptr<parquet::arrow::FileReader> fileReader;
+ THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(arrowFile, arrow::default_memory_pool(), &fileReader));
+
+ std::shared_ptr<arrow::Schema> schema;
+ THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
+ std::vector<int> columnIndices;
+ std::vector<TColumnConverter> columnConverters;
+
+ BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
+
+ for (int group = 0; group < fileReader->num_row_groups(); group++) {
+
+ std::shared_ptr<arrow::Table> table;
+ THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
+
+ auto reader = std::make_unique<arrow::TableBatchReader>(*table);
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ ::arrow::Status status;
+ while (status = reader->ReadNext(&batch), status.ok() && batch) {
+ Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
+ ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta()
+ ));
}
-
- if (!event->CastAsLocal<TEvPrivate::TEvPause>() && !event->CastAsLocal<TEvPrivate::TEvContinue>() && !event->CastAsLocal<TEvPrivate::TEvReadFinished>()) {
- otherEvents.push_back(event->ReleaseBase());
+ if (!status.ok()) {
+ throw yexception() << status.ToString();
}
-
- event = WaitForEvent();
}
- for (auto& e: otherEvents) {
- Send(SelfActorId, e.Release());
- }
+ LOG_CORO_D("RunCoroBlockArrowParserOverHttp - FINISHED");
}
- void Run() final try {
+ void RunCoroBlockArrowParserOverFile() {
- LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path);
+ LOG_CORO_D("RunCoroBlockArrowParserOverFile");
- NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
+ std::shared_ptr<arrow::io::RandomAccessFile> arrowFile =
+ std::make_shared<TRandomAccessFileTrafficCounter>(this,
+ arrow::io::ReadableFile::Open((Url + Path).substr(7), arrow::default_memory_pool()).ValueOrDie()
+ );
+ std::unique_ptr<parquet::arrow::FileReader> fileReader;
+ parquet::arrow::FileReaderBuilder builder;
+ builder.memory_pool(arrow::default_memory_pool());
+ parquet::ArrowReaderProperties properties;
+ properties.set_pre_buffer(true);
+ builder.properties(properties);
+ THROW_ARROW_NOT_OK(builder.Open(arrowFile));
+ THROW_ARROW_NOT_OK(builder.Build(&fileReader));
- TIssue exceptIssue;
- bool isLocal = Url.StartsWith("file://");
- bool needWaitFinish = !isLocal;
- try {
- if (ReadSpec->Arrow) {
- TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format);
- if (IsDownloadNeeded()) {
- // Read file entirely
- std::unique_ptr<NDB::ReadBuffer> buffer;
- if (isLocal) {
- buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path);
- } else {
- buffer = std::make_unique<TReadBufferFromStream>(this);
- }
+ std::shared_ptr<arrow::Schema> schema;
+ THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema));
+ std::vector<int> columnIndices;
+ std::vector<TColumnConverter> columnConverters;
- const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
- YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
- auto& readBuffer = decompress ? *decompress : *buffer;
- TStringBuilder sb;
- TBuffer buff(256_KB);
-
- while (!readBuffer.eof()) {
- if (!readBuffer.hasPendingData()) {
- if (!readBuffer.next()) {
- break;
- }
- }
- auto bytesReaded = readBuffer.read(buff.data(), 256_KB);
- sb.append(buff.data(), buff.data() + bytesReaded);
- }
+ BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
- fileDesc.Contents = sb;
+ auto actorSystem = GetActorSystem();
+ auto selfId = SelfActorId;
+ size_t cntBlocksInFly = 0;
- } else {
- needWaitFinish = false;
- }
- auto actorSystem = GetActorSystem();
- auto onResolve = [actorSystem, actorId = this->SelfActorId] {
- actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved()));
- };
- auto future = ArrowReader->GetSchema(fileDesc);
- future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
- onResolve();
- });
- WaitEvent<TEvPrivate::TEvFutureResolved>();
- auto result = future.GetValue();
- std::shared_ptr<arrow::Schema> schema = result.Schema;
- std::vector<int> columnIndices;
- std::vector<TColumnConverter> columnConverters;
- columnConverters.reserve(ReadSpec->ArrowSchema->num_fields());
- for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) {
- const auto& targetField = ReadSpec->ArrowSchema->field(i);
- auto srcFieldIndex = schema->GetFieldIndex(targetField->name());
- YQL_ENSURE(srcFieldIndex != -1, "Missing field: " << targetField->name());
- auto targetType = targetField->type();
- auto originalType = schema->field(srcFieldIndex)->type();
- YQL_ENSURE(!originalType->layout().has_dictionary, "Unsupported dictionary encoding is used for field: "
- << targetField->name() << ", type: " << originalType->ToString());
- columnIndices.push_back(srcFieldIndex);
- auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
- YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
- columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
- }
+ for (int group = 0; group < fileReader->num_row_groups(); group++) {
- fileDesc.Cookie = result.Cookie;
- TArrowParquetBatchReader reader(std::move(fileDesc),
- ArrowReader,
- result.NumRowGroups,
- std::move(columnIndices),
- std::move(columnConverters),
- onResolve,
- [&] { WaitEvent<TEvPrivate::TEvFutureResolved>(); });
- ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal);
- } else {
- std::unique_ptr<NDB::ReadBuffer> buffer;
- if (isLocal) {
- buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path);
- } else {
- buffer = std::make_unique<TReadBufferFromStream>(this);
- }
+ std::shared_ptr<arrow::Table> table;
+ THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table));
- const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
- YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
+ auto reader = std::make_unique<arrow::TableBatchReader>(*table);
- auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
- TBlockReader reader(std::move(stream));
- ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal);
+ std::shared_ptr<arrow::RecordBatch> batch;
+ ::arrow::Status status;
+ while (status = reader->ReadNext(&batch), status.ok() && batch) {
+ if (++cntBlocksInFly > MaxBlocksInFly) {
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
+ --cntBlocksInFly;
+ }
+ Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
+ ConvertArrowColumns(batch, columnConverters), PathIndex, [actorSystem, selfId]() {
+ actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed()));
+ }, TakeIngressDelta()
+ ));
}
- } catch (const TS3ReadError&) {
- // Finish reading. Add error from server to issues
- LOG_CORO_D("TS3ReadCoroImpl", "S3 read error. Path: " << Path);
- } catch (const TDtorException&) {
- throw;
- } catch (const NDB::Exception& err) {
- TStringBuilder msgBuilder;
- msgBuilder << err.message();
- if (err.code()) {
- msgBuilder << " (code: " << err.code() << ")";
+ if (!status.ok()) {
+ throw yexception() << status.ToString();
}
- exceptIssue.SetMessage(msgBuilder);
- fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
- RetryStuff->Cancel();
- } catch (const std::exception& err) {
- exceptIssue.SetMessage(err.what());
- fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
- RetryStuff->Cancel();
+ }
+ while (cntBlocksInFly--) {
+ WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>();
}
- if (needWaitFinish) {
- WaitFinish();
+ LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED");
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvReadStarted, Handle);
+ hFunc(TEvPrivate::TEvDataPart, Handle);
+ hFunc(TEvPrivate::TEvReadFinished, Handle);
+ hFunc(TEvPrivate::TEvPause, Handle);
+ hFunc(TEvPrivate::TEvContinue, Handle);
+ hFunc(NActors::TEvents::TEvPoison, Handle);
+ )
+
+ void ProcessOneEvent() {
+ if (!Paused && !DeferredDataParts.empty()) {
+ ExtractDataPart(*DeferredDataParts.front(), true);
+ DeferredDataParts.pop();
+ if (DeferredQueueSize) {
+ DeferredQueueSize->Dec();
+ }
+ return;
}
+ TAutoPtr<::NActors::IEventHandle> ev(WaitForEvent().Release());
+ StateFunc(ev, GetActorContext());
+ }
+
+ void ExtractDataPart(TEvPrivate::TEvDataPart& event, bool deferred = false) {
+ InputBuffer = event.Result.Extract();
+ IngressBytes += InputBuffer.size();
+ RetryStuff->Offset += InputBuffer.size();
+ RetryStuff->SizeLimit -= InputBuffer.size();
+ LastOffset = RetryStuff->Offset;
+ LastData = InputBuffer;
+ LOG_CORO_T("TEvDataPart (" << (deferred ? "deferred" : "instant") << "), size: " << InputBuffer.size());
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ }
+ void Handle(TEvPrivate::TEvReadStarted::TPtr& ev) {
+ HttpResponseCode = ev->Get()->HttpResponseCode;
+ LOG_CORO_D("TEvReadStarted, Http code: " << HttpResponseCode);
+ }
+
+ void Handle(TEvPrivate::TEvDataPart::TPtr& ev) {
+ if (HttpDataRps) {
+ HttpDataRps->Inc();
+ }
+ if (200L == HttpResponseCode || 206L == HttpResponseCode) {
+ if (Paused || !DeferredDataParts.empty()) {
+ DeferredDataParts.push(std::move(ev->Release()));
+ if (DeferredQueueSize) {
+ DeferredQueueSize->Inc();
+ }
+ } else {
+ ExtractDataPart(*ev->Get(), false);
+ }
+ } else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) {
+ ServerReturnedError = true;
+ if (ErrorText.size() < 256_KB)
+ ErrorText.append(ev->Get()->Result.Extract());
+ else if (!ErrorText.EndsWith(TruncatedSuffix))
+ ErrorText.append(TruncatedSuffix);
+ LOG_CORO_W("TEvDataPart, ERROR: " << ErrorText << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
+ }
+ }
+
+ void Handle(TEvPrivate::TEvReadFinished::TPtr& ev) {
+
+ Issues.Clear();
if (!ErrorText.empty()) {
TString errorCode;
TString message;
@@ -1550,66 +1580,149 @@ private:
Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message));
}
- if (exceptIssue.GetMessage()) {
- Issues.AddIssue(exceptIssue);
+ if (ev->Get()->Issues) {
+ Issues.AddIssues(ev->Get()->Issues);
}
- auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
- if (issues)
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
- else
- Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, GetIngressDelta()));
- } catch (const TS3ReadAbort&) {
- LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path);
- } catch (const TDtorException&) {
- return RetryStuff->Cancel();
- } catch (const std::exception& err) {
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what())}, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
- return;
- }
-
- template <typename T, typename TEv>
- void ProcessBatches(IBatchReader<T>& reader, bool isLocal) {
- auto actorSystem = GetActorSystem();
- auto selfActorId = SelfActorId;
- size_t cntBlocksInFly = 0;
- if (isLocal) {
- for (;;) {
- T batch;
+ if (HttpResponseCode >= 300) {
+ ServerReturnedError = true;
+ Issues.AddIssue(TIssue{TStringBuilder() << "HTTP error code: " << HttpResponseCode});
+ }
- if (!reader.Next(batch)) {
- break;
- }
- if (++cntBlocksInFly > MaxBlocksInFly) {
- WaitEvent<TEvPrivate::TEvBlockProcessed>();
- --cntBlocksInFly;
- }
- Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() {
- actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed()));
- }, GetIngressDelta()));
+ if (Issues) {
+ RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode >= 300 ? HttpResponseCode : 0);
+ LOG_CORO_D("TEvReadFinished with Issues (try to retry): " << Issues.ToOneLineString());
+ if (!RetryStuff->NextRetryDelay) {
+ RetryStuff->RetryState = nullptr;
+ InputFinished = true;
+ LOG_CORO_W("ReadError: " << Issues.ToOneLineString() << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
+ throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
}
- while (cntBlocksInFly--) {
- WaitEvent<TEvPrivate::TEvBlockProcessed>();
+ }
+
+ if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) {
+ GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize))));
+ InputBuffer.clear();
+ if (DeferredDataParts.size()) {
+ if (DeferredQueueSize) {
+ DeferredQueueSize->Sub(DeferredDataParts.size());
+ }
+ std::queue<THolder<TEvPrivate::TEvDataPart>> tmp;
+ DeferredDataParts.swap(tmp);
}
} else {
- for (;;) {
- T batch;
- if (!reader.Next(batch)) {
- break;
+ LOG_CORO_D("TEvReadFinished, LastOffset: " << LastOffset << ", Error: " << ServerReturnedError);
+ InputFinished = true;
+ if (ServerReturnedError) {
+ throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
+ }
+ }
+ }
+
+ void Handle(TEvPrivate::TEvPause::TPtr&) {
+ LOG_CORO_D("TEvPause");
+ Paused = true;
+ }
+
+ void Handle(TEvPrivate::TEvContinue::TPtr&) {
+ LOG_CORO_D("TEvContinue");
+ Paused = false;
+ }
+
+ void Handle(NActors::TEvents::TEvPoison::TPtr&) {
+ RetryStuff->Cancel();
+ throw TS3ReadAbort();
+ }
+
+private:
+ static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
+public:
+ TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
+ const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
+ const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader,
+ const TS3ReadActorFactoryConfig& readActorFactoryCfg,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps)
+ : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
+ TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
+ PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader),
+ DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps)
+ {}
+
+ ~TS3ReadCoroImpl() override {
+ if (DeferredDataParts.size() && DeferredQueueSize) {
+ DeferredQueueSize->Sub(DeferredDataParts.size());
+ }
+ }
+
+private:
+ ui64 TakeIngressDelta() {
+ auto currentIngressBytes = IngressBytes;
+ IngressBytes = 0;
+ return currentIngressBytes;
+ }
+
+ void Run() final {
+
+ NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
+ try {
+ if (ReadSpec->Arrow) {
+ if (ReadSpec->Compression) {
+ Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression, use Pragma DisableUseBlocks"));
+ fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+ } else if (ReadSpec->ThreadPool) {
+ RunThreadPoolBlockArrowParser();
+ } else {
+ if (Url.StartsWith("file://")) {
+ RunCoroBlockArrowParserOverFile();
+ } else {
+ RunCoroBlockArrowParserOverHttp();
+ }
+ }
+ } else {
+ try {
+ if (Url.StartsWith("file://")) {
+ RunClickHouseParserOverFile();
+ } else {
+ RunClickHouseParserOverHttp();
+ }
+ } catch (const TS3ReadError&) {
+ // Just to avoid parser error after transport failure
+ LOG_CORO_D("S3 read ERROR");
+ } catch (const NDB::Exception& err) {
+ TStringBuilder msgBuilder;
+ msgBuilder << err.message();
+ if (err.code()) {
+ msgBuilder << " (code: " << err.code() << ")";
+ }
+ Issues.AddIssue(TIssue(msgBuilder));
+ fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
+ RetryStuff->Cancel();
}
- Send(ParentActorId, new TEv(batch, PathIndex, [](){}, GetIngressDelta()));
}
+ } catch (const TS3ReadAbort&) {
+ // Poison handler actually
+ LOG_CORO_D("S3 read ABORT");
+ } catch (const TDtorException&) {
+ // Stop any activity instantly
+ RetryStuff->Cancel();
+ return;
+ } catch (const std::exception& err) {
+ Issues.AddIssue(TIssue(err.what()));
+ fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR;
+ RetryStuff->Cancel();
}
+
+ auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues));
+ if (issues)
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode));
+ else
+ Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta()));
}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final {
- TStringBuilder message;
- message << "Error while reading file " << Path << ", details: "
- << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite());
- if (auto* eventBase = ev->GetBase()) {
- message << " (" << eventBase->ToStringHeader() << ")";
- }
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(message)}, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
+ return StateFunc(ev, GetActorContext());
}
TString GetLastDataAsText() {
@@ -1667,8 +1780,9 @@ private:
std::size_t MaxBlocksInFly = 2;
IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
+ TString InputBuffer;
bool Paused = false;
- std::queue<THolder<IEventHandle>> DeferredEvents;
+ std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts;
const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
@@ -1676,26 +1790,13 @@ private:
class TS3ReadCoroActor : public TActorCoro {
public:
- TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, bool isDownloadNeeded, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize)
+ TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
- , RetryStuff(std::move(retryStuff))
- , PathIndex(pathIndex)
- , IsDownloadNeeded(isDownloadNeeded)
- , HttpInflightSize(httpInflightSize)
{}
private:
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
- if (IsDownloadNeeded && RetryStuff->Url.substr(0, 6) != "file://") {
- LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]");
- DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex, HttpInflightSize);
- }
}
-
- const TRetryStuff::TPtr RetryStuff;
- const size_t PathIndex;
- const bool IsDownloadNeeded;
- const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
};
class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput {
@@ -1821,7 +1922,7 @@ public:
InputIndex,
TxId,
ComputeActorId,
- stuff,
+ std::move(stuff),
ReadSpec,
pathIndex,
objectPath.Path,
@@ -1832,10 +1933,7 @@ public:
DeferredQueueSize,
HttpInflightSize,
HttpDataRps);
- auto isDownloadNeeded = impl->IsDownloadNeeded();
- const auto& httpInflightSize = impl->HttpInflightSize;
- CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(
- std::move(impl), std::move(stuff), pathIndex, isDownloadNeeded, httpInflightSize)));
+ CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl))));
}
TObjectPath ReadPathFromCache() {
@@ -2411,6 +2509,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Arrow = params.GetArrow();
+ readSpec->ThreadPool = params.GetThreadPool();
if (readSpec->Arrow) {
arrow::SchemaBuilder builder;
const TStringBuf blockLengthColumn("_yql_block_length"sv);
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 06b69e4d1e..7a8e65a476 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -77,6 +77,11 @@
"Match": {"Type": "Callable", "Name": "S3ArrowSettings"}
},
{
+ "Name": "TS3CoroArrowSettings",
+ "Base": "TS3ParseSettingsBase",
+ "Match": {"Type": "Callable", "Name": "S3CoroArrowSettings"}
+ },
+ {
"Name": "TS3Read",
"Base": "TFreeArgCallable",
"Match": {"Type": "Callable", "Name": "Read!"},
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index 3b01e90ec1..c174ce2cd9 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -16,4 +16,5 @@ message TSource {
optional string Format = 5;
map<string, string> Settings = 6;
bool Arrow = 7;
+ bool ThreadPool = 8;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 0e1c9a218c..00aeccd4cc 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -111,6 +111,7 @@ public:
AddHandler({TS3SourceSettings::CallableName()}, Hndl(&TSelf::HandleS3SourceSettings));
AddHandler({TS3ParseSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase));
AddHandler({TS3ArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase));
+ AddHandler({TS3CoroArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase));
AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig));
}
@@ -177,7 +178,7 @@ public:
}
const TTypeAnnotationNode* itemType = nullptr;
- if (input->Content() == TS3ArrowSettings::CallableName()) {
+ if (input->Content() == TS3ArrowSettings::CallableName() || input->Content() == TS3CoroArrowSettings::CallableName()) {
TVector<const TItemExprType*> blockRowTypeItems;
for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 28cabdb079..3a21c6929a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -192,10 +192,10 @@ public:
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
supportedArrowTypes = resolveStatus == IArrowResolver::OK;
}
-
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3ParseSettingsBase>()
- .CallableName((supportedArrowTypes && format == "parquet") ? TS3ArrowSettings::CallableName() :
+ .CallableName((supportedArrowTypes && format == "parquet") ?
+ (State_->Configuration->ArrowThreadPool.Get().GetOrElse(true) ? TS3ArrowSettings::CallableName() : TS3CoroArrowSettings::CallableName() ):
TS3ParseSettings::CallableName())
.Paths(s3ReadObject.Object().Paths())
.Token<TCoSecureParam>()
@@ -277,7 +277,8 @@ public:
if (const auto mayParseSettings = settings.Maybe<TS3ParseSettingsBase>()) {
const auto parseSettings = mayParseSettings.Cast();
srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
- srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()));
+ srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()) || bool(parseSettings.Maybe<TS3CoroArrowSettings>()));
+ srcDesc.SetThreadPool(bool(parseSettings.Maybe<TS3ArrowSettings>()));
const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
// exclude extra columns to get actual row type we need to read from input
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index bd3e92d89a..997090570f 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -514,7 +514,7 @@ public:
}
}
- if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>()) {
+ if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>() && !parseSettingsBase.Maybe<TS3CoroArrowSettings>()) {
const TStructExprType* readRowDataType = ctx.MakeType<TStructExprType>(readRowDataItems);
auto item = GetLightColumn(*readRowDataType);
YQL_ENSURE(item);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 72d7a691a0..a60dfa7639 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -15,6 +15,7 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, SerializeMemoryLimit);
REGISTER_SETTING(*this, InFlightMemoryLimit);
REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000);
+ REGISTER_SETTING(*this, ArrowThreadPool);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 86bb1ed5c9..e2f5caa3cd 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -17,6 +17,7 @@ struct TS3Settings {
NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions.
NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink.
NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000.
+ NCommon::TConfSetting<bool, false> ArrowThreadPool;
};
struct TS3ClusterSettings {