diff options
author | hor911 <hor911@ydb.tech> | 2023-05-23 20:07:03 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-05-23 20:07:03 +0300 |
commit | 78300d3d4b282c3c948e3ad181ec183ce32adcc5 (patch) | |
tree | 8be7706d2369f8467afbb0d08198b157af0b4c02 | |
parent | 0d0afc40a0d7e0535403cfe00efb2b1e5a9dfaef (diff) | |
download | ydb-78300d3d4b282c3c948e3ad181ec183ce32adcc5.tar.gz |
Remove s3.ArrowThreadPool settings
9 files changed, 11 insertions, 107 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 669e09ec83..d28bf071e3 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -1781,10 +1781,6 @@ private: *gatewaysConfig.MutableS3() = Params.Config.GetGateways().GetS3(); gatewaysConfig.MutableS3()->ClearClusterMapping(); - auto* attr = gatewaysConfig.MutableS3()->MutableDefaultSettings()->Add(); - attr->SetName("ArrowThreadPool"); - attr->SetValue("false"); - THashMap<TString, TString> clusters; TString monitoringEndpoint = Params.Config.GetCommon().GetMonitoringEndpoint(); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 3dc9673e3e..6937eceb63 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -159,7 +159,6 @@ struct TEvPrivate { EvNextRecordBatch, EvFileFinished, EvContinue, - EvFutureResolved, EvObjectPathBatch, EvObjectPathReadError, EvReadResult2, @@ -192,10 +191,6 @@ struct TEvPrivate { IHTTPGateway::TCountedContent Result; }; - struct TEvFutureResolved : public TEventLocal<TEvFutureResolved, EvFutureResolved> { - TEvFutureResolved() {} - }; - struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> { TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode) : CurlResponseCode(curlResponseCode), HttpResponseCode(httpResponseCode) {} @@ -999,7 +994,6 @@ struct TReadSpec { using TPtr = std::shared_ptr<TReadSpec>; bool Arrow = false; - bool ThreadPool = false; ui64 ParallelRowGroupCount = 0; bool RowGroupReordering = true; ui64 ParallelDownloadCount = 0; @@ -1513,73 +1507,6 @@ public: } } - void RunThreadPoolBlockArrowParser() { - - LOG_CORO_D("RunThreadPoolBlockArrowParser"); - - TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format); - - auto actorSystem = GetActorSystem(); - auto selfId = SelfActorId; - - auto future = ArrowReader->GetSchema(fileDesc); - future.Subscribe([actorSystem, selfId](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) { - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved())); - }); - - CpuTime += GetCpuTimeDelta(); - - WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - auto result = future.GetValue(); - - StartCycleCount = GetCycleCountFast(); - - fileDesc.Cookie = result.Cookie; - - std::shared_ptr<arrow::Schema> schema = result.Schema; - std::vector<int> columnIndices; - std::vector<TColumnConverter> columnConverters; - - BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters); - - for (int group = 0; group < result.NumRowGroups; group++) { - - auto future = ArrowReader->ReadRowGroup(fileDesc, group, columnIndices); - future.Subscribe([actorSystem, selfId](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){ - actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved())); - }); - - CpuTime += GetCpuTimeDelta(); - - WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - auto table = future.GetValue(); - - StartCycleCount = GetCycleCountFast(); - - auto reader = std::make_unique<arrow::TableBatchReader>(*table); - - std::shared_ptr<arrow::RecordBatch> batch; - ::arrow::Status status; - while (status = reader->ReadNext(&batch), status.ok() && batch) { - auto convertedBatch = ConvertArrowColumns(batch, columnConverters); - Paused = QueueBufferCounter->Add(NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch), SelfActorId); - Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch( - convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta() - )); - if (Paused) { - CpuTime += GetCpuTimeDelta(); - auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent); - HandleEvent(*ev); - StartCycleCount = GetCycleCountFast(); - } - } - if (!status.ok()) { - throw yexception() << status.ToString(); - } - } - LOG_CORO_D("RunThreadPoolBlockArrowParser FINISHED"); - } - struct TReadCache { ui64 Cookie = 0; TString Data; @@ -2089,7 +2016,7 @@ private: public: TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, - const TString& path, const TString& url, IArrowReader::TPtr arrowReader, + const TString& path, const TString& url, const TS3ReadActorFactoryConfig& readActorFactoryCfg, TReadBufferCounter::TPtr queueBufferCounter, const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize, @@ -2098,7 +2025,7 @@ public: const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize) : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), - PathIndex(pathIndex), Path(path), Url(url), ArrowReader(arrowReader), + PathIndex(pathIndex), Path(path), Url(url), QueueBufferCounter(queueBufferCounter), DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize), HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) { @@ -2148,14 +2075,10 @@ private: fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; } else { try { - if (ReadSpec->ThreadPool) { - RunThreadPoolBlockArrowParser(); + if (Url.StartsWith("file://")) { + RunCoroBlockArrowParserOverFile(); } else { - if (Url.StartsWith("file://")) { - RunCoroBlockArrowParserOverFile(); - } else { - RunCoroBlockArrowParserOverHttp(); - } + RunCoroBlockArrowParserOverHttp(); } } catch (const parquet::ParquetException& ex) { Issues.AddIssue(TIssue(ex.what())); @@ -2258,7 +2181,6 @@ private: std::size_t LastOffset = 0; TString LastData; - IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; TDuration CpuTime; ui64 StartCycleCount = 0; @@ -2300,7 +2222,6 @@ public: const TReadSpec::TPtr& readSpec, const NActors::TActorId& computeActorId, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, - IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, @@ -2320,7 +2241,6 @@ public: , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) , ReadSpec(readSpec) - , ArrowReader(std::move(arrowReader)) , Counters(std::move(counters)) , TaskCounters(std::move(taskCounters)) , FileSizeLimit(fileSizeLimit) { @@ -2439,7 +2359,6 @@ public: pathIndex, objectPath.Path, Url, - ArrowReader, ReadActorFactoryCfg, QueueBufferCounter, DeferredQueueSize, @@ -2749,7 +2668,6 @@ private: size_t CompletedFiles = 0; const TReadSpec::TPtr ReadSpec; std::deque<TReadyBlock> Blocks; - IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; TDuration CpuTime; mutable TInstant LastMemoryReport = TInstant::Now(); @@ -2925,7 +2843,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, - IArrowReader::TPtr arrowReader, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters) { @@ -2987,7 +2904,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto readSpec = std::make_shared<TReadSpec>(); readSpec->Arrow = params.GetArrow(); - readSpec->ThreadPool = params.GetThreadPool(); readSpec->ParallelRowGroupCount = params.GetParallelRowGroupCount(); readSpec->RowGroupReordering = params.GetRowGroupReordering(); readSpec->ParallelDownloadCount = params.GetParallelDownloadCount(); @@ -3076,7 +2992,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SUPPORTED_FLAGS const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, - arrowReader, cfg, counters, taskCounters, fileSizeLimit); + cfg, counters, taskCounters, fileSizeLimit); return {actor, actor}; } else { diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 019a326a98..2b3e64b12a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -5,7 +5,6 @@ #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> #include <ydb/library/yql/providers/s3/proto/source.pb.h> #include <ydb/library/yql/providers/common/token_accessor/client/factory.h> -#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h> #include <library/cpp/actors/core/actor.h> namespace NYql::NDq { @@ -25,7 +24,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, - IArrowReader::TPtr arrowReader, ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 5eae61bc38..b84c033570 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -16,15 +16,14 @@ void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& cfg, - ::NMonitoring::TDynamicCounterPtr counters, - IArrowReader::TPtr arrowReader) { + ::NMonitoring::TDynamicCounterPtr counters) { #if defined(_linux_) || defined(_darwin_) NDB::registerFormats(); factory.RegisterSource<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryPolicy, cfg, counters, arrowReader](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, - args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, arrowReader, + args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, counters, args.TaskCounters); }); #else diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index c78ff01a98..89bfc5009c 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -6,7 +6,6 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> #include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> -#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <util/generic/size_literals.h> @@ -28,7 +27,6 @@ void RegisterS3ReadActorFactory( IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), const TS3ReadActorFactoryConfig& = {}, - ::NMonitoring::TDynamicCounterPtr counters = nullptr, - IArrowReader::TPtr arrowReader = MakeArrowReader(TArrowReaderSettings())); + ::NMonitoring::TDynamicCounterPtr counters = nullptr); } diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto index 426cf90f21..a14fcc91b0 100644 --- a/ydb/library/yql/providers/s3/proto/source.proto +++ b/ydb/library/yql/providers/s3/proto/source.proto @@ -16,7 +16,7 @@ message TSource { optional string Format = 5; map<string, string> Settings = 6; bool Arrow = 7; - bool ThreadPool = 8; + bool ThreadPool = 8; // deprecated uint64 ParallelRowGroupCount = 9; bool RowGroupReordering = 10; uint64 ParallelDownloadCount = 11; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 5cf8f9d5b3..31628ec902 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -286,7 +286,6 @@ public: const auto parseSettings = mayParseSettings.Cast(); srcDesc.SetFormat(parseSettings.Format().StringValue().c_str()); srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>())); - srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true)); srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(0)); srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(true)); srcDesc.SetParallelDownloadCount(State_->Configuration->ParallelDownloadCount.Get().GetOrElse(0)); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 9734654fc7..af2346806b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -15,7 +15,6 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, SerializeMemoryLimit); REGISTER_SETTING(*this, InFlightMemoryLimit); REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000); - REGISTER_SETTING(*this, ArrowThreadPool); REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1); REGISTER_SETTING(*this, ArrowRowGroupReordering); REGISTER_SETTING(*this, ParallelDownloadCount); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 6a08fbd7b7..bce44b7844 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -17,7 +17,6 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions. NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink. NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000. - NCommon::TConfSetting<bool, false> ArrowThreadPool; NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1 NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK NCommon::TConfSetting<ui64, false> ParallelDownloadCount; // Number of files to read in parallel, min == 1 |