diff options
author | hor911 <hor911@ydb.tech> | 2023-03-01 20:20:07 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-03-01 20:20:07 +0300 |
commit | 4217c0e360c22b33763317089141ab196386c21b (patch) | |
tree | d72e4f9baa074d45bcf9752bfcf94d2c3f63b353 | |
parent | 23f0afc1966496d0937ee30f2aba19e77f93d17f (diff) | |
download | ydb-4217c0e360c22b33763317089141ab196386c21b.tar.gz |
Cororead refactored
13 files changed, 572 insertions, 455 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 296418df1f..714a696f2c 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -1661,6 +1661,10 @@ private: *gatewaysConfig.MutableS3() = Params.Config.GetGateways().GetS3(); gatewaysConfig.MutableS3()->ClearClusterMapping(); + auto* attr = gatewaysConfig.MutableS3()->MutableDefaultSettings()->Add(); + attr->SetName("ArrowThreadPool"); + attr->SetValue("false"); + THashMap<TString, TString> clusters; TString monitoringEndpoint = Params.Config.GetCommon().GetMonitoringEndpoint(); @@ -1698,7 +1702,8 @@ private: } { - dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory)); + dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, + Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles())); } { diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto index 013fbaeb84..3fa3fdcd15 100644 --- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto @@ -13,6 +13,7 @@ message TS3ReadActorFactoryConfig { uint64 RowsInBatch = 2; // Default = 1000 uint64 MaxInflight = 3; // Default = 20 uint64 DataInflight = 4; // Default = 200 MB + bool AllowLocalFiles = 5; } message TPqReadActorFactoryConfig { diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 049a77b401..f90db4d998 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -164,7 +164,7 @@ void Init( if (protoConfig.GetPrivateApi().GetEnabled()) { const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig(); - auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(s3readConfig.GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one + auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::Max()); NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg; if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) { readActorFactoryCfg.RowsInBatch = rowsInBatch; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 8dc0d00ae0..169815a33b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1627,7 +1627,7 @@ protected: Y_VERIFY(!TaskRunner || info.Buffer); if (info.Finished) { - CA_LOG_D("Skip polling async input[" << inputIndex << "]: finished"); + CA_LOG_T("Skip polling async input[" << inputIndex << "]: finished"); return; } @@ -1670,7 +1670,7 @@ protected: AsyncInputPush(std::move(batch), info, space, finished); } else { - CA_LOG_D("Skip polling async input[" << inputIndex << "]: no free space: " << freeSpace); + CA_LOG_T("Skip polling async input[" << inputIndex << "]: no free space: " << freeSpace); ContinueExecute(); // If there is no free space in buffer, => we have something to process } } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 29a05254a6..6dba58a1d8 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -411,16 +411,18 @@ private: OnFinish(TIssues{error}); } - void MaybeStart() { + void MaybeStart(long httpResponseCode = 0) { if (!HttpResponseCode) { - curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &HttpResponseCode); + if (!httpResponseCode) { + curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); + } + HttpResponseCode = httpResponseCode; OnStart(HttpResponseCode); } } void Done(CURLcode result, long httpResponseCode) final { - HttpResponseCode = httpResponseCode; - OnStart(HttpResponseCode); + MaybeStart(httpResponseCode); if (CURLE_OK != result) { return Fail(TIssue(TStringBuilder{} << "error: " << curl_easy_strerror(result) << " detailed: " << GetDetailedErrorText())); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index af9ebb83be..1951d4ec1b 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -26,6 +26,7 @@ #include <arrow/io/api.h> #include <arrow/compute/cast.h> #include <arrow/status.h> +#include <arrow/util/future.h> #include <parquet/arrow/reader.h> #include <parquet/file_reader.h> @@ -68,6 +69,7 @@ #include <util/generic/size_literals.h> #include <util/stream/format.h> +#include <util/system/fstat.h> #include <queue> @@ -80,24 +82,29 @@ #define LOG_E(name, stream) \ LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) #define LOG_W(name, stream) \ - LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) + LOG_WARN_S (*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) #define LOG_I(name, stream) \ - LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) + LOG_INFO_S (*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) #define LOG_D(name, stream) \ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) #define LOG_T(name, stream) \ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, name << ": " << this->SelfId() << ", TxId: " << TxId << ". " << stream) -#define LOG_CORO_E(name, stream) \ - LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) -#define LOG_CORO_W(name, stream) \ - LOG_WARN_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) -#define LOG_CORO_I(name, stream) \ - LOG_INFO_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) -#define LOG_CORO_D(name, stream) \ - LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) -#define LOG_CORO_T(name, stream) \ - LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, name << ": " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId << ". " << stream) +#define LOG_CORO_E(stream) \ + LOG_ERROR_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \ + << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream) +#define LOG_CORO_W(stream) \ + LOG_WARN_S (GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \ + << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream) +#define LOG_CORO_I(stream) \ + LOG_INFO_S (GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \ + << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream) +#define LOG_CORO_D(stream) \ + LOG_DEBUG_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \ + << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream) +#define LOG_CORO_T(stream) \ + LOG_TRACE_S(GetActorContext(), NKikimrServices::KQP_COMPUTE, "TS3ReadCoroImpl: " << SelfActorId << ", CA: " << ComputeActorId << ", TxId: " << TxId \ + << " [" << Path << "]. RETRY{ Offset: " << RetryStuff->Offset << ", Delay: " << RetryStuff->NextRetryDelay << ", RequestId: " << RetryStuff->RequestId << "}. " << stream) #define THROW_ARROW_NOT_OK(status) \ do \ @@ -155,6 +162,7 @@ struct TEvPrivate { EvFutureResolved, EvObjectPathBatch, EvObjectPathReadError, + EvReadResult2, EvEnd }; @@ -271,6 +279,15 @@ struct TEvPrivate { TIssues Issues; TEvObjectPathReadError(TIssues issues) : Issues(std::move(issues)) { } }; + + struct TEvReadResult2 : public TEventLocal<TEvReadResult2, EvReadResult2> { + TEvReadResult2(IHTTPGateway::TContent&& result) : Failure(false), Result(std::move(result)) { } + TEvReadResult2(TIssues&& issues) : Failure(true), Result(""), Issues(std::move(issues)) { } + const bool Failure; + IHTTPGateway::TContent Result; + const TIssues Issues; + }; + }; using namespace NKikimr::NMiniKQL; @@ -976,6 +993,7 @@ struct TReadSpec { using TPtr = std::shared_ptr<TReadSpec>; bool Arrow = false; + bool ThreadPool = false; std::unordered_map<TStringBuf, TType*, THash<TStringBuf>> RowSpec; NDB::ColumnsWithTypeAndName CHColumns; std::shared_ptr<arrow::Schema> ArrowSchema; @@ -1004,7 +1022,6 @@ struct TRetryStuff { , SizeLimit(sizeLimit) , TxId(txId) , RequestId(requestId) - , RetryState(retryPolicy->CreateRetryState()) , RetryPolicy(retryPolicy) , Cancelled(false) {} @@ -1062,92 +1079,8 @@ void DownloadStart(const TRetryStuff::TPtr& retryStuff, TActorSystem* actorSyste inflightCounter); } -template <typename T> -class IBatchReader { -public: - virtual ~IBatchReader() = default; - - virtual bool Next(T& value) = 0; -}; - -class TBlockReader : public IBatchReader<NDB::Block> { -public: - TBlockReader(std::unique_ptr<NDB::IBlockInputStream>&& stream) - : Stream(std::move(stream)) - {} - - bool Next(NDB::Block& value) final { - value = Stream->read(); - return !!value; - } - -private: - std::unique_ptr<NDB::IBlockInputStream> Stream; -}; - using TColumnConverter = std::function<std::shared_ptr<arrow::Array>(const std::shared_ptr<arrow::Array>&)>; -class TArrowParquetBatchReader : public IBatchReader<std::shared_ptr<arrow::RecordBatch>> { -public: - TArrowParquetBatchReader(TArrowFileDesc&& fileDesc, IArrowReader::TPtr arrowReader, int numRowGroups, std::vector<int>&& columnIndices, std::vector<TColumnConverter>&& columnConverters, std::function<void()>&& onFutureResolve, std::function<void()>&& waitForFutureResolve) - : FileDesc(std::move(fileDesc)) - , ArrowReader(arrowReader) - , ColumnIndices(std::move(columnIndices)) - , ColumnConverters(std::move(columnConverters)) - , TotalGroups(numRowGroups) - , CurrentGroup(0) - , OnFutureResolve(std::move(onFutureResolve)) - , WaitForFutureResolve(std::move(waitForFutureResolve)) - {} - - bool Next(std::shared_ptr<arrow::RecordBatch>& value) final { - for (;;) { - if (CurrentGroup == TotalGroups) { - return false; - } - - if (!CurrentBatchReader) { - auto future = ArrowReader->ReadRowGroup(FileDesc, CurrentGroup++, ColumnIndices); - future.Subscribe([f = OnFutureResolve](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ - f(); - }); - WaitForFutureResolve(); - CurrentTable = future.GetValue(); - CurrentBatchReader = std::make_unique<arrow::TableBatchReader>(*CurrentTable); - } - - THROW_ARROW_NOT_OK(CurrentBatchReader->ReadNext(&value)); - if (value) { - auto columns = value->columns(); - for (size_t i = 0; i < ColumnConverters.size(); ++i) { - auto converter = ColumnConverters[i]; - if (converter) { - columns[i] = converter(columns[i]); - } - } - - value = arrow::RecordBatch::Make(value->schema(), value->num_rows(), columns); - return true; - } - - CurrentBatchReader = nullptr; - CurrentTable = nullptr; - } - } - -private: - TArrowFileDesc FileDesc; - IArrowReader::TPtr ArrowReader; - const std::vector<int> ColumnIndices; - std::vector<TColumnConverter> ColumnConverters; - const int TotalGroups; - int CurrentGroup; - std::shared_ptr<arrow::Table> CurrentTable; - std::unique_ptr<arrow::TableBatchReader> CurrentBatchReader; - std::function<void()> OnFutureResolve; - std::function<void()> WaitForFutureResolve; -}; - template <bool isOptional> std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value) { ::NYql::NUdf::TFixedSizeArrayBuilder<ui16, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); @@ -1173,7 +1106,7 @@ std::shared_ptr<arrow::Array> ArrowDate32AsYqlDate(const std::shared_ptr<arrow:: } TColumnConverter BuildColumnConverter(const std::string& columnName, const std::shared_ptr<arrow::DataType>& originalType, const std::shared_ptr<arrow::DataType>& targetType, TType* yqlType) { - if (originalType->id() == arrow::Type::DATE32) { +if (originalType->id() == arrow::Type::DATE32) { // TODO: support more than 1 optional level bool isOptional = false; auto unpackedYqlType = UnpackOptional(yqlType, isOptional); @@ -1199,6 +1132,7 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std:: YQL_ENSURE(arrow::compute::CanCast(*originalType, *targetType), "Mismatch type for field: " << columnName << ", expected: " << targetType->ToString() << ", got: " << originalType->ToString()); + return [targetType](const std::shared_ptr<arrow::Array>& value) { auto res = arrow::compute::Cast(*value, targetType); THROW_ARROW_NOT_OK(res.status()); @@ -1206,341 +1140,437 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std:: }; } +std::shared_ptr<arrow::RecordBatch> ConvertArrowColumns(std::shared_ptr<arrow::RecordBatch> batch, std::vector<TColumnConverter>& columnConverters) { + auto columns = batch->columns(); + for (size_t i = 0; i < columnConverters.size(); ++i) { + auto converter = columnConverters[i]; + if (converter) { + columns[i] = converter(columns[i]); + } + } + return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns); +} + class TS3ReadCoroImpl : public TActorCoroImpl { friend class TS3StreamReadActor; -private: - class TReadBufferFromStream : public NDB::ReadBuffer { + +public: + + class THttpRandomAccessFile : public arrow::io::RandomAccessFile { public: - TReadBufferFromStream(TS3ReadCoroImpl* coro) - : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro) - {} + THttpRandomAccessFile(TS3ReadCoroImpl* coro, size_t fileSize) : Coro(coro), FileSize(fileSize) { + } + + // has no meaning and use + arrow::Result<int64_t> GetSize() override { return FileSize; } + arrow::Result<int64_t> Tell() const override { return InnerPos; } + arrow::Status Seek(int64_t position) override { InnerPos = position; return {}; } + arrow::Status Close() override { return {}; } + bool closed() const override { return false; } + // must not be used currently + arrow::Result<int64_t> Read(int64_t, void*) override { + Y_VERIFY(0); + return arrow::Result<int64_t>(); + } + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override { + Y_VERIFY(0); + return arrow::Result<std::shared_ptr<arrow::Buffer>>(); + } + arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t, int64_t) override { + Y_VERIFY(0); + return arrow::Future<std::shared_ptr<arrow::Buffer>>(); + } + // the only useful + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override { + return Coro->ReadAt(position, nbytes); + } + // future work + arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>&) override { return{}; } + + private: + TS3ReadCoroImpl *const Coro; + const size_t FileSize; + int64_t InnerPos = 0; + }; + + class TRandomAccessFileTrafficCounter : public arrow::io::RandomAccessFile { + public: + TRandomAccessFileTrafficCounter(TS3ReadCoroImpl* coro, std::shared_ptr<arrow::io::RandomAccessFile> impl) + : Coro(coro), Impl(impl) { + } + arrow::Result<int64_t> GetSize() override { return Impl->GetSize(); } + virtual arrow::Result<int64_t> Tell() const override { return Impl->Tell(); } + virtual arrow::Status Seek(int64_t position) override { return Impl->Seek(position); } + virtual arrow::Status Close() override { return Impl->Close(); } + virtual bool closed() const override { return Impl->closed(); } + arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override { return Impl->Read(nbytes, buffer); } + arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override { return Impl->Read(nbytes); } + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override { + auto result = Impl->ReadAt(position, nbytes); + Coro->IngressBytes += nbytes; + return result; + } + arrow::Status WillNeed(const std::vector<arrow::io::ReadRange>& ranges) override { return Impl->WillNeed(ranges); } + + private: + TS3ReadCoroImpl *const Coro; + std::shared_ptr<arrow::io::RandomAccessFile> Impl; + }; + + class TCoroReadBuffer : public NDB::ReadBuffer { + public: + TCoroReadBuffer(TS3ReadCoroImpl* coro) + : NDB::ReadBuffer(nullptr, 0ULL) + , Coro(coro) + { } private: bool nextImpl() final { - while (Coro->Next(Value)) { - if (!Value.empty()) { - working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size()); + while (!Coro->InputFinished || !Coro->DeferredDataParts.empty()) { + Coro->ProcessOneEvent(); + if (Coro->InputBuffer) { + WorkingBuffer.swap(Coro->InputBuffer); + Coro->InputBuffer.clear(); + auto rawData = const_cast<char*>(WorkingBuffer.data()); + working_buffer = NDB::BufferBase::Buffer(rawData, rawData + WorkingBuffer.size()); return true; } } return false; } - TS3ReadCoroImpl *const Coro; - TString Value; + TString WorkingBuffer; }; - static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; -public: - TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, - const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, - const TS3ReadActorFactoryConfig& readActorFactoryCfg, - const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, - const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, - const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) - : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), - TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), - PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader), - DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) - {} + void RunClickHouseParserOverHttp() { - ~TS3ReadCoroImpl() override { - if (DeferredEvents.size() && DeferredQueueSize) { - DeferredQueueSize->Sub(DeferredEvents.size()); + LOG_CORO_D("RunClickHouseParserOverHttp"); + + std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this); + std::unique_ptr<NDB::ReadBuffer> decompressorBuffer; + NDB::ReadBuffer* buffer = coroBuffer.get(); + + if (ReadSpec->Compression) { + decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression); + YQL_ENSURE(buffer, "Unsupported " << ReadSpec->Compression << " compression."); + buffer = decompressorBuffer.get(); } - } - bool IsDownloadNeeded() const { - return !ReadSpec->Arrow || !ReadSpec->Compression.empty(); + DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize); + + auto stream = std::make_unique<NDB::InputStreamFromInputFormat>( + NDB::FormatFactory::instance().getInputFormat( + ReadSpec->Format, *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings + ) + ); + + while (NDB::Block batch = stream->read()) { + Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [](){}, TakeIngressDelta())); + } + + LOG_CORO_D("RunClickHouseParserOverHttp - FINISHED"); } - bool Next(TString& value) { - if (InputFinished) - return false; + void RunClickHouseParserOverFile() { - if (Paused || DeferredEvents.empty()) { - auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted - , TEvPrivate::TEvDataPart - , TEvPrivate::TEvReadFinished - , TEvPrivate::TEvPause - , TEvPrivate::TEvContinue - , NActors::TEvents::TEvPoison>(); + LOG_CORO_D("RunClickHouseParserOverFile"); - switch (const auto etype = ev->GetTypeRewrite()) { - case TEvPrivate::TEvPause::EventType: - Paused = true; - break; - case TEvPrivate::TEvContinue::EventType: - Paused = false; - break; - case NActors::TEvents::TEvPoison::EventType: - RetryStuff->Cancel(); - throw TS3ReadAbort(); - default: - DeferredEvents.push(std::move(ev)); - if (DeferredQueueSize) { - DeferredQueueSize->Inc(); - } - break; - } + TString fileName = Url.substr(7) + Path; + + std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<NDB::ReadBufferFromFile>(fileName); + std::unique_ptr<NDB::ReadBuffer> decompressorBuffer; + NDB::ReadBuffer* buffer = coroBuffer.get(); + + if (ReadSpec->Compression) { + decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression); + YQL_ENSURE(buffer, "Unsupported " << ReadSpec->Compression << " compression."); + buffer = decompressorBuffer.get(); } - if (Paused || DeferredEvents.empty()) { - value.clear(); - return true; + auto stream = std::make_unique<NDB::InputStreamFromInputFormat>( + NDB::FormatFactory::instance().getInputFormat( + ReadSpec->Format, *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings + ) + ); + + auto actorSystem = GetActorSystem(); + auto selfId = SelfActorId; + size_t cntBlocksInFly = 0; + + while (NDB::Block batch = stream->read()) { + if (++cntBlocksInFly > MaxBlocksInFly) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + --cntBlocksInFly; + } + Send(ParentActorId, new TEvPrivate::TEvNextBlock(batch, PathIndex, [actorSystem, selfId]() { + actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed())); + }, TakeIngressDelta())); } + while (cntBlocksInFly--) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + } + + IngressBytes += GetFileLength(fileName); - THolder<IEventHandle> ev; - ev.Swap(DeferredEvents.front()); - DeferredEvents.pop(); - if (DeferredQueueSize) { - DeferredQueueSize->Dec(); + LOG_CORO_D("RunClickHouseParserOverFile FINISHED"); + } + + void BuildColumnConverters(std::shared_ptr<arrow::Schema> outputSchema, std::shared_ptr<arrow::Schema> dataSchema, + std::vector<int>& columnIndices, std::vector<TColumnConverter>& columnConverters) { + columnConverters.reserve(outputSchema->num_fields()); + for (int i = 0; i < outputSchema->num_fields(); ++i) { + const auto& targetField = outputSchema->field(i); + auto srcFieldIndex = dataSchema->GetFieldIndex(targetField->name()); + YQL_ENSURE(srcFieldIndex != -1, "Missing field: " << targetField->name()); + auto targetType = targetField->type(); + auto originalType = dataSchema->field(srcFieldIndex)->type(); + YQL_ENSURE(!originalType->layout().has_dictionary, "Unsupported dictionary encoding is used for field: " + << targetField->name() << ", type: " << originalType->ToString()); + columnIndices.push_back(srcFieldIndex); + auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name()); + YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec"); + columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second)); } + } - switch (const auto etype = ev->GetTypeRewrite()) { - case TEvPrivate::TEvReadStarted::EventType: - ErrorText.clear(); - Issues.Clear(); - value.clear(); - RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode); - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadStarted, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", Http code: " << HttpResponseCode << ", retry after: " << RetryStuff->NextRetryDelay << ", request id: [" << RetryStuff->RequestId << "]"); - if (!RetryStuff->NextRetryDelay) { // Success or not retryable - RetryStuff->RetryState = nullptr; - } - return true; - case TEvPrivate::TEvReadFinished::EventType: - Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); + void RunThreadPoolBlockArrowParser() { - if (HttpResponseCode >= 300) { - ServerReturnedError = true; - Issues.AddIssue(TIssue{TStringBuilder() << "HTTP error code: " << HttpResponseCode}); - } + LOG_CORO_D("RunThreadPoolBlockArrowParser"); - if (Issues) { - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished. Url: " << RetryStuff->Url << ". Issues: " << Issues.ToOneLineString()); - if (!RetryStuff->NextRetryDelay) { - InputFinished = true; - LOG_CORO_W("TS3ReadCoroImpl", "ReadError: " << Issues.ToOneLineString() << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]"); - throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format - } - } + TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format); - if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) { - LOG_CORO_D("TS3ReadCoroImpl", "Retry Download in " << RetryStuff->NextRetryDelay << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "], Issues: " << Issues.ToOneLineString()); - GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize)))); - value.clear(); - } else { - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", Error: " << ServerReturnedError << ", request id: [" << RetryStuff->RequestId << "]"); - InputFinished = true; - if (ServerReturnedError) { - throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format - } - return false; // end of data (real data, not an error) - } - return true; - case TEvPrivate::TEvDataPart::EventType: - if (HttpDataRps) { - HttpDataRps->Inc(); - } - if (200L == HttpResponseCode || 206L == HttpResponseCode) { - value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); - IngressBytes += value.size(); - RetryStuff->Offset += value.size(); - RetryStuff->SizeLimit -= value.size(); - LastOffset = RetryStuff->Offset; - LastData = value; - LOG_CORO_T("TS3ReadCoroImpl", "TEvDataPart, size: " << value.size() << ", Url: " << RetryStuff->Url << ", Offset (updated): " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); - } else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) { - ServerReturnedError = true; - if (ErrorText.size() < 256_KB) - ErrorText.append(ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract()); - else if (!ErrorText.EndsWith(TruncatedSuffix)) - ErrorText.append(TruncatedSuffix); - value.clear(); - LOG_CORO_W("TS3ReadCoroImpl", "TEvDataPart, ERROR: " << ErrorText << ", Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText() << ", request id: [" << RetryStuff->RequestId << "]"); - } - return true; - default: - return false; + auto actorSystem = GetActorSystem(); + auto selfId = SelfActorId; + + auto future = ArrowReader->GetSchema(fileDesc); + future.Subscribe([actorSystem, selfId](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) { + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved())); + }); + WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(); + auto result = future.GetValue(); + + fileDesc.Cookie = result.Cookie; + + std::shared_ptr<arrow::Schema> schema = result.Schema; + std::vector<int> columnIndices; + std::vector<TColumnConverter> columnConverters; + + BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); + + for (int group = 0; group < result.NumRowGroups; group++) { + + auto future = ArrowReader->ReadRowGroup(fileDesc, group, columnIndices); + future.Subscribe([actorSystem, selfId](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ + actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved())); + }); + WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(); + auto table = future.GetValue(); + auto reader = std::make_unique<arrow::TableBatchReader>(*table); + + std::shared_ptr<arrow::RecordBatch> batch; + ::arrow::Status status; + while (status = reader->ReadNext(&batch), status.ok() && batch) { + Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( + ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta() + )); + } + if (!status.ok()) { + throw yexception() << status.ToString(); + } } + + LOG_CORO_D("RunThreadPoolBlockArrowParser FINISHED"); } -private: - ui64 GetIngressDelta() { - auto currentIngressBytes = IngressBytes; - IngressBytes = 0; - return currentIngressBytes; + + arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) { + RetryStuff->Gateway->Download(Url + Path, RetryStuff->Headers, + position, + nbytes, + std::bind(&OnResult, GetActorSystem(), SelfActorId, std::placeholders::_1), + {}, + RetryStuff->RetryPolicy); + auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult2>(); + if (ev->Get()->Failure) { + throw yexception() << ev->Get()->Issues.ToOneLineString(); + } else { + auto result = ev->Get()->Result.Extract(); + IngressBytes += nbytes; + return arrow::Buffer::FromString(result); + } } - void WaitFinish() { - LOG_CORO_D("TS3ReadCoroImpl", "WaitFinish: " << Path); - if (InputFinished) + static void OnResult(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) { + switch (result.index()) { + case 0U: + actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvReadResult2(std::get<IHTTPGateway::TContent>(std::move(result))))); return; - - while (true) { - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted - , TEvPrivate::TEvDataPart - , TEvPrivate::TEvReadFinished - , TEvPrivate::TEvPause - , TEvPrivate::TEvContinue - , NActors::TEvents::TEvPoison>(); - - const auto etype = ev->GetTypeRewrite(); - switch (etype) { - case TEvPrivate::TEvReadFinished::EventType: - Issues = std::move(ev->Get<TEvPrivate::TEvReadFinished>()->Issues); - LOG_CORO_D("TS3ReadCoroImpl", "TEvReadFinished: " << Path << " " << Issues.ToOneLineString()); - break; - case NActors::TEvents::TEvPoison::EventType: - RetryStuff->Cancel(); - throw TS3ReadAbort(); - default: - continue; - } - InputFinished = true; + case 1U: + actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvReadResult2(std::get<TIssues>(std::move(result))))); return; + default: + break; } } - template<typename EvType> - void WaitEvent() { - auto event = WaitForEvent(); - TVector<THolder<IEventBase>> otherEvents; - while (!event->CastAsLocal<EvType>()) { - if (event->CastAsLocal<NActors::TEvents::TEvPoison>()) { - throw TS3ReadAbort(); + void RunCoroBlockArrowParserOverHttp() { + + LOG_CORO_D("RunCoroBlockArrowParserOverHttp"); + + std::shared_ptr<arrow::io::RandomAccessFile> arrowFile = std::make_shared<THttpRandomAccessFile>(this, RetryStuff->SizeLimit); + std::unique_ptr<parquet::arrow::FileReader> fileReader; + THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(arrowFile, arrow::default_memory_pool(), &fileReader)); + + std::shared_ptr<arrow::Schema> schema; + THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema)); + std::vector<int> columnIndices; + std::vector<TColumnConverter> columnConverters; + + BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); + + for (int group = 0; group < fileReader->num_row_groups(); group++) { + + std::shared_ptr<arrow::Table> table; + THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table)); + + auto reader = std::make_unique<arrow::TableBatchReader>(*table); + + std::shared_ptr<arrow::RecordBatch> batch; + ::arrow::Status status; + while (status = reader->ReadNext(&batch), status.ok() && batch) { + Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( + ConvertArrowColumns(batch, columnConverters), PathIndex, [](){}, TakeIngressDelta() + )); } - - if (!event->CastAsLocal<TEvPrivate::TEvPause>() && !event->CastAsLocal<TEvPrivate::TEvContinue>() && !event->CastAsLocal<TEvPrivate::TEvReadFinished>()) { - otherEvents.push_back(event->ReleaseBase()); + if (!status.ok()) { + throw yexception() << status.ToString(); } - - event = WaitForEvent(); } - for (auto& e: otherEvents) { - Send(SelfActorId, e.Release()); - } + LOG_CORO_D("RunCoroBlockArrowParserOverHttp - FINISHED"); } - void Run() final try { + void RunCoroBlockArrowParserOverFile() { - LOG_CORO_D("TS3ReadCoroImpl", "Run" << ", Path: " << Path); + LOG_CORO_D("RunCoroBlockArrowParserOverFile"); - NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; + std::shared_ptr<arrow::io::RandomAccessFile> arrowFile = + std::make_shared<TRandomAccessFileTrafficCounter>(this, + arrow::io::ReadableFile::Open((Url + Path).substr(7), arrow::default_memory_pool()).ValueOrDie() + ); + std::unique_ptr<parquet::arrow::FileReader> fileReader; + parquet::arrow::FileReaderBuilder builder; + builder.memory_pool(arrow::default_memory_pool()); + parquet::ArrowReaderProperties properties; + properties.set_pre_buffer(true); + builder.properties(properties); + THROW_ARROW_NOT_OK(builder.Open(arrowFile)); + THROW_ARROW_NOT_OK(builder.Build(&fileReader)); - TIssue exceptIssue; - bool isLocal = Url.StartsWith("file://"); - bool needWaitFinish = !isLocal; - try { - if (ReadSpec->Arrow) { - TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format); - if (IsDownloadNeeded()) { - // Read file entirely - std::unique_ptr<NDB::ReadBuffer> buffer; - if (isLocal) { - buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path); - } else { - buffer = std::make_unique<TReadBufferFromStream>(this); - } + std::shared_ptr<arrow::Schema> schema; + THROW_ARROW_NOT_OK(fileReader->GetSchema(&schema)); + std::vector<int> columnIndices; + std::vector<TColumnConverter> columnConverters; - const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); - YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); - auto& readBuffer = decompress ? *decompress : *buffer; - TStringBuilder sb; - TBuffer buff(256_KB); - - while (!readBuffer.eof()) { - if (!readBuffer.hasPendingData()) { - if (!readBuffer.next()) { - break; - } - } - auto bytesReaded = readBuffer.read(buff.data(), 256_KB); - sb.append(buff.data(), buff.data() + bytesReaded); - } + BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); - fileDesc.Contents = sb; + auto actorSystem = GetActorSystem(); + auto selfId = SelfActorId; + size_t cntBlocksInFly = 0; - } else { - needWaitFinish = false; - } - auto actorSystem = GetActorSystem(); - auto onResolve = [actorSystem, actorId = this->SelfActorId] { - actorSystem->Send(new IEventHandle(actorId, actorId, new TEvPrivate::TEvFutureResolved())); - }; - auto future = ArrowReader->GetSchema(fileDesc); - future.Subscribe([onResolve](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) { - onResolve(); - }); - WaitEvent<TEvPrivate::TEvFutureResolved>(); - auto result = future.GetValue(); - std::shared_ptr<arrow::Schema> schema = result.Schema; - std::vector<int> columnIndices; - std::vector<TColumnConverter> columnConverters; - columnConverters.reserve(ReadSpec->ArrowSchema->num_fields()); - for (int i = 0; i < ReadSpec->ArrowSchema->num_fields(); ++i) { - const auto& targetField = ReadSpec->ArrowSchema->field(i); - auto srcFieldIndex = schema->GetFieldIndex(targetField->name()); - YQL_ENSURE(srcFieldIndex != -1, "Missing field: " << targetField->name()); - auto targetType = targetField->type(); - auto originalType = schema->field(srcFieldIndex)->type(); - YQL_ENSURE(!originalType->layout().has_dictionary, "Unsupported dictionary encoding is used for field: " - << targetField->name() << ", type: " << originalType->ToString()); - columnIndices.push_back(srcFieldIndex); - auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name()); - YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec"); - columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second)); - } + for (int group = 0; group < fileReader->num_row_groups(); group++) { - fileDesc.Cookie = result.Cookie; - TArrowParquetBatchReader reader(std::move(fileDesc), - ArrowReader, - result.NumRowGroups, - std::move(columnIndices), - std::move(columnConverters), - onResolve, - [&] { WaitEvent<TEvPrivate::TEvFutureResolved>(); }); - ProcessBatches<std::shared_ptr<arrow::RecordBatch>, TEvPrivate::TEvNextRecordBatch>(reader, isLocal); - } else { - std::unique_ptr<NDB::ReadBuffer> buffer; - if (isLocal) { - buffer = std::make_unique<NDB::ReadBufferFromFile>(Url.substr(7) + Path); - } else { - buffer = std::make_unique<TReadBufferFromStream>(this); - } + std::shared_ptr<arrow::Table> table; + THROW_ARROW_NOT_OK(fileReader->ReadRowGroup(group, columnIndices, &table)); - const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); - YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); + auto reader = std::make_unique<arrow::TableBatchReader>(*table); - auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->CHColumns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); - TBlockReader reader(std::move(stream)); - ProcessBatches<NDB::Block, TEvPrivate::TEvNextBlock>(reader, isLocal); + std::shared_ptr<arrow::RecordBatch> batch; + ::arrow::Status status; + while (status = reader->ReadNext(&batch), status.ok() && batch) { + if (++cntBlocksInFly > MaxBlocksInFly) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); + --cntBlocksInFly; + } + Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( + ConvertArrowColumns(batch, columnConverters), PathIndex, [actorSystem, selfId]() { + actorSystem->Send(new IEventHandle(selfId, TActorId{}, new TEvPrivate::TEvBlockProcessed())); + }, TakeIngressDelta() + )); } - } catch (const TS3ReadError&) { - // Finish reading. Add error from server to issues - LOG_CORO_D("TS3ReadCoroImpl", "S3 read error. Path: " << Path); - } catch (const TDtorException&) { - throw; - } catch (const NDB::Exception& err) { - TStringBuilder msgBuilder; - msgBuilder << err.message(); - if (err.code()) { - msgBuilder << " (code: " << err.code() << ")"; + if (!status.ok()) { + throw yexception() << status.ToString(); } - exceptIssue.SetMessage(msgBuilder); - fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; - RetryStuff->Cancel(); - } catch (const std::exception& err) { - exceptIssue.SetMessage(err.what()); - fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; - RetryStuff->Cancel(); + } + while (cntBlocksInFly--) { + WaitForSpecificEvent<TEvPrivate::TEvBlockProcessed>(); } - if (needWaitFinish) { - WaitFinish(); + LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED"); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvReadStarted, Handle); + hFunc(TEvPrivate::TEvDataPart, Handle); + hFunc(TEvPrivate::TEvReadFinished, Handle); + hFunc(TEvPrivate::TEvPause, Handle); + hFunc(TEvPrivate::TEvContinue, Handle); + hFunc(NActors::TEvents::TEvPoison, Handle); + ) + + void ProcessOneEvent() { + if (!Paused && !DeferredDataParts.empty()) { + ExtractDataPart(*DeferredDataParts.front(), true); + DeferredDataParts.pop(); + if (DeferredQueueSize) { + DeferredQueueSize->Dec(); + } + return; } + TAutoPtr<::NActors::IEventHandle> ev(WaitForEvent().Release()); + StateFunc(ev, GetActorContext()); + } + + void ExtractDataPart(TEvPrivate::TEvDataPart& event, bool deferred = false) { + InputBuffer = event.Result.Extract(); + IngressBytes += InputBuffer.size(); + RetryStuff->Offset += InputBuffer.size(); + RetryStuff->SizeLimit -= InputBuffer.size(); + LastOffset = RetryStuff->Offset; + LastData = InputBuffer; + LOG_CORO_T("TEvDataPart (" << (deferred ? "deferred" : "instant") << "), size: " << InputBuffer.size()); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + } + void Handle(TEvPrivate::TEvReadStarted::TPtr& ev) { + HttpResponseCode = ev->Get()->HttpResponseCode; + LOG_CORO_D("TEvReadStarted, Http code: " << HttpResponseCode); + } + + void Handle(TEvPrivate::TEvDataPart::TPtr& ev) { + if (HttpDataRps) { + HttpDataRps->Inc(); + } + if (200L == HttpResponseCode || 206L == HttpResponseCode) { + if (Paused || !DeferredDataParts.empty()) { + DeferredDataParts.push(std::move(ev->Release())); + if (DeferredQueueSize) { + DeferredQueueSize->Inc(); + } + } else { + ExtractDataPart(*ev->Get(), false); + } + } else if (HttpResponseCode && !RetryStuff->IsCancelled() && !RetryStuff->NextRetryDelay) { + ServerReturnedError = true; + if (ErrorText.size() < 256_KB) + ErrorText.append(ev->Get()->Result.Extract()); + else if (!ErrorText.EndsWith(TruncatedSuffix)) + ErrorText.append(TruncatedSuffix); + LOG_CORO_W("TEvDataPart, ERROR: " << ErrorText << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); + } + } + + void Handle(TEvPrivate::TEvReadFinished::TPtr& ev) { + + Issues.Clear(); if (!ErrorText.empty()) { TString errorCode; TString message; @@ -1550,66 +1580,149 @@ private: Issues.AddIssues(BuildIssues(HttpResponseCode, errorCode, message)); } - if (exceptIssue.GetMessage()) { - Issues.AddIssue(exceptIssue); + if (ev->Get()->Issues) { + Issues.AddIssues(ev->Get()->Issues); } - auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues)); - if (issues) - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); - else - Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, GetIngressDelta())); - } catch (const TS3ReadAbort&) { - LOG_CORO_D("TS3ReadCoroImpl", "S3 read abort. Path: " << Path); - } catch (const TDtorException&) { - return RetryStuff->Cancel(); - } catch (const std::exception& err) { - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what())}, NYql::NDqProto::StatusIds::INTERNAL_ERROR)); - return; - } - - template <typename T, typename TEv> - void ProcessBatches(IBatchReader<T>& reader, bool isLocal) { - auto actorSystem = GetActorSystem(); - auto selfActorId = SelfActorId; - size_t cntBlocksInFly = 0; - if (isLocal) { - for (;;) { - T batch; + if (HttpResponseCode >= 300) { + ServerReturnedError = true; + Issues.AddIssue(TIssue{TStringBuilder() << "HTTP error code: " << HttpResponseCode}); + } - if (!reader.Next(batch)) { - break; - } - if (++cntBlocksInFly > MaxBlocksInFly) { - WaitEvent<TEvPrivate::TEvBlockProcessed>(); - --cntBlocksInFly; - } - Send(ParentActorId, new TEv(batch, PathIndex, [actorSystem, selfActorId]() { - actorSystem->Send(new IEventHandle(selfActorId, selfActorId, new TEvPrivate::TEvBlockProcessed())); - }, GetIngressDelta())); + if (Issues) { + RetryStuff->NextRetryDelay = RetryStuff->GetRetryState()->GetNextRetryDelay(HttpResponseCode >= 300 ? HttpResponseCode : 0); + LOG_CORO_D("TEvReadFinished with Issues (try to retry): " << Issues.ToOneLineString()); + if (!RetryStuff->NextRetryDelay) { + RetryStuff->RetryState = nullptr; + InputFinished = true; + LOG_CORO_W("ReadError: " << Issues.ToOneLineString() << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText()); + throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format } - while (cntBlocksInFly--) { - WaitEvent<TEvPrivate::TEvBlockProcessed>(); + } + + if (!RetryStuff->IsCancelled() && RetryStuff->NextRetryDelay && RetryStuff->SizeLimit > 0ULL) { + GetActorSystem()->Schedule(*RetryStuff->NextRetryDelay, new IEventHandle(ParentActorId, SelfActorId, new TEvPrivate::TEvRetryEventFunc(std::bind(&DownloadStart, RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize)))); + InputBuffer.clear(); + if (DeferredDataParts.size()) { + if (DeferredQueueSize) { + DeferredQueueSize->Sub(DeferredDataParts.size()); + } + std::queue<THolder<TEvPrivate::TEvDataPart>> tmp; + DeferredDataParts.swap(tmp); } } else { - for (;;) { - T batch; - if (!reader.Next(batch)) { - break; + LOG_CORO_D("TEvReadFinished, LastOffset: " << LastOffset << ", Error: " << ServerReturnedError); + InputFinished = true; + if (ServerReturnedError) { + throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format + } + } + } + + void Handle(TEvPrivate::TEvPause::TPtr&) { + LOG_CORO_D("TEvPause"); + Paused = true; + } + + void Handle(TEvPrivate::TEvContinue::TPtr&) { + LOG_CORO_D("TEvContinue"); + Paused = false; + } + + void Handle(NActors::TEvents::TEvPoison::TPtr&) { + RetryStuff->Cancel(); + throw TS3ReadAbort(); + } + +private: + static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; +public: + TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, + const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, + const TString& path, const TString& url, const std::size_t maxBlocksInFly, IArrowReader::TPtr arrowReader, + const TS3ReadActorFactoryConfig& readActorFactoryCfg, + const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, + const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize, + const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps) + : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), + TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), + PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly), ArrowReader(arrowReader), + DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps) + {} + + ~TS3ReadCoroImpl() override { + if (DeferredDataParts.size() && DeferredQueueSize) { + DeferredQueueSize->Sub(DeferredDataParts.size()); + } + } + +private: + ui64 TakeIngressDelta() { + auto currentIngressBytes = IngressBytes; + IngressBytes = 0; + return currentIngressBytes; + } + + void Run() final { + + NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR; + try { + if (ReadSpec->Arrow) { + if (ReadSpec->Compression) { + Issues.AddIssue(TIssue("Blocks optimisations are incompatible with external compression, use Pragma DisableUseBlocks")); + fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + } else if (ReadSpec->ThreadPool) { + RunThreadPoolBlockArrowParser(); + } else { + if (Url.StartsWith("file://")) { + RunCoroBlockArrowParserOverFile(); + } else { + RunCoroBlockArrowParserOverHttp(); + } + } + } else { + try { + if (Url.StartsWith("file://")) { + RunClickHouseParserOverFile(); + } else { + RunClickHouseParserOverHttp(); + } + } catch (const TS3ReadError&) { + // Just to avoid parser error after transport failure + LOG_CORO_D("S3 read ERROR"); + } catch (const NDB::Exception& err) { + TStringBuilder msgBuilder; + msgBuilder << err.message(); + if (err.code()) { + msgBuilder << " (code: " << err.code() << ")"; + } + Issues.AddIssue(TIssue(msgBuilder)); + fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; + RetryStuff->Cancel(); } - Send(ParentActorId, new TEv(batch, PathIndex, [](){}, GetIngressDelta())); } + } catch (const TS3ReadAbort&) { + // Poison handler actually + LOG_CORO_D("S3 read ABORT"); + } catch (const TDtorException&) { + // Stop any activity instantly + RetryStuff->Cancel(); + return; + } catch (const std::exception& err) { + Issues.AddIssue(TIssue(err.what())); + fatalCode = NYql::NDqProto::StatusIds::INTERNAL_ERROR; + RetryStuff->Cancel(); } + + auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << Path, std::move(Issues)); + if (issues) + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(issues), fatalCode)); + else + Send(ParentActorId, new TEvPrivate::TEvFileFinished(PathIndex, TakeIngressDelta())); } void ProcessUnexpectedEvent(TAutoPtr<IEventHandle> ev) final { - TStringBuilder message; - message << "Error while reading file " << Path << ", details: " - << "S3 read. Unexpected message type " << Hex(ev->GetTypeRewrite()); - if (auto* eventBase = ev->GetBase()) { - message << " (" << eventBase->ToStringHeader() << ")"; - } - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(message)}, NYql::NDqProto::StatusIds::INTERNAL_ERROR)); + return StateFunc(ev, GetActorContext()); } TString GetLastDataAsText() { @@ -1667,8 +1780,9 @@ private: std::size_t MaxBlocksInFly = 2; IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; + TString InputBuffer; bool Paused = false; - std::queue<THolder<IEventHandle>> DeferredEvents; + std::queue<THolder<TEvPrivate::TEvDataPart>> DeferredDataParts; const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps; @@ -1676,26 +1790,13 @@ private: class TS3ReadCoroActor : public TActorCoro { public: - TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff, size_t pathIndex, bool isDownloadNeeded, const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize) + TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) - , RetryStuff(std::move(retryStuff)) - , PathIndex(pathIndex) - , IsDownloadNeeded(isDownloadNeeded) - , HttpInflightSize(httpInflightSize) {} private: void Registered(TActorSystem* actorSystem, const TActorId& parent) override { TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself. - if (IsDownloadNeeded && RetryStuff->Url.substr(0, 6) != "file://") { - LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_COMPUTE, "TS3ReadCoroActor" << ": " << SelfId() << ", TxId: " << RetryStuff->TxId << ". " << "Start Download, Url: " << RetryStuff->Url << ", Offset: " << RetryStuff->Offset << ", request id: [" << RetryStuff->RequestId << "]"); - DownloadStart(RetryStuff, actorSystem, SelfId(), parent, PathIndex, HttpInflightSize); - } } - - const TRetryStuff::TPtr RetryStuff; - const size_t PathIndex; - const bool IsDownloadNeeded; - const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize; }; class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqComputeActorAsyncInput { @@ -1821,7 +1922,7 @@ public: InputIndex, TxId, ComputeActorId, - stuff, + std::move(stuff), ReadSpec, pathIndex, objectPath.Path, @@ -1832,10 +1933,7 @@ public: DeferredQueueSize, HttpInflightSize, HttpDataRps); - auto isDownloadNeeded = impl->IsDownloadNeeded(); - const auto& httpInflightSize = impl->HttpInflightSize; - CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor( - std::move(impl), std::move(stuff), pathIndex, isDownloadNeeded, httpInflightSize))); + CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(std::move(impl)))); } TObjectPath ReadPathFromCache() { @@ -2411,6 +2509,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Arrow = params.GetArrow(); + readSpec->ThreadPool = params.GetThreadPool(); if (readSpec->Arrow) { arrow::SchemaBuilder builder; const TStringBuf blockLengthColumn("_yql_block_length"sv); diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 06b69e4d1e..7a8e65a476 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -77,6 +77,11 @@ "Match": {"Type": "Callable", "Name": "S3ArrowSettings"} }, { + "Name": "TS3CoroArrowSettings", + "Base": "TS3ParseSettingsBase", + "Match": {"Type": "Callable", "Name": "S3CoroArrowSettings"} + }, + { "Name": "TS3Read", "Base": "TFreeArgCallable", "Match": {"Type": "Callable", "Name": "Read!"}, diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index 3b01e90ec1..c174ce2cd9 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -16,4 +16,5 @@ message TSource { optional string Format = 5; map<string, string> Settings = 6; bool Arrow = 7; + bool ThreadPool = 8; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 0e1c9a218c..00aeccd4cc 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -111,6 +111,7 @@ public: AddHandler({TS3SourceSettings::CallableName()}, Hndl(&TSelf::HandleS3SourceSettings)); AddHandler({TS3ParseSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase)); AddHandler({TS3ArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase)); + AddHandler({TS3CoroArrowSettings::CallableName()}, Hndl(&TSelf::HandleS3ParseSettingsBase)); AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig)); } @@ -177,7 +178,7 @@ public: } const TTypeAnnotationNode* itemType = nullptr; - if (input->Content() == TS3ArrowSettings::CallableName()) { + if (input->Content() == TS3ArrowSettings::CallableName() || input->Content() == TS3CoroArrowSettings::CallableName()) { TVector<const TItemExprType*> blockRowTypeItems; for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) { blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType()))); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 28cabdb079..3a21c6929a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -192,10 +192,10 @@ public: YQL_ENSURE(resolveStatus != IArrowResolver::ERROR); supportedArrowTypes = resolveStatus == IArrowResolver::OK; } - return Build<TDqSourceWrap>(ctx, read->Pos()) .Input<TS3ParseSettingsBase>() - .CallableName((supportedArrowTypes && format == "parquet") ? TS3ArrowSettings::CallableName() : + .CallableName((supportedArrowTypes && format == "parquet") ? + (State_->Configuration->ArrowThreadPool.Get().GetOrElse(true) ? TS3ArrowSettings::CallableName() : TS3CoroArrowSettings::CallableName() ): TS3ParseSettings::CallableName()) .Paths(s3ReadObject.Object().Paths()) .Token<TCoSecureParam>() @@ -277,7 +277,8 @@ public: if (const auto mayParseSettings = settings.Maybe<TS3ParseSettingsBase>()) { const auto parseSettings = mayParseSettings.Cast(); srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); - srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>())); + srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()) || bool(parseSettings.Maybe<TS3CoroArrowSettings>())); + srcDesc.SetThreadPool(bool(parseSettings.Maybe<TS3ArrowSettings>())); const TStructExprType* fullRowType = parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); // exclude extra columns to get actual row type we need to read from input diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index bd3e92d89a..997090570f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -514,7 +514,7 @@ public: } } - if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>()) { + if (outputRowDataItems.size() == 0 && readRowDataItems.size() != 0 && !parseSettingsBase.Maybe<TS3ArrowSettings>() && !parseSettingsBase.Maybe<TS3CoroArrowSettings>()) { const TStructExprType* readRowDataType = ctx.MakeType<TStructExprType>(readRowDataItems); auto item = GetLightColumn(*readRowDataType); YQL_ENSURE(item); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 72d7a691a0..a60dfa7639 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -15,6 +15,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, SerializeMemoryLimit); REGISTER_SETTING(*this, InFlightMemoryLimit); REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000); + REGISTER_SETTING(*this, ArrowThreadPool); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 86bb1ed5c9..e2f5caa3cd 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -17,6 +17,7 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions. NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink. NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000. + NCommon::TConfSetting<bool, false> ArrowThreadPool; }; struct TS3ClusterSettings { |