aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-05-23 20:07:03 +0300
committerhor911 <hor911@ydb.tech>2023-05-23 20:07:03 +0300
commit78300d3d4b282c3c948e3ad181ec183ce32adcc5 (patch)
tree8be7706d2369f8467afbb0d08198b157af0b4c02
parent0d0afc40a0d7e0535403cfe00efb2b1e5a9dfaef (diff)
downloadydb-78300d3d4b282c3c948e3ad181ec183ce32adcc5.tar.gz
Remove s3.ArrowThreadPool settings
-rw-r--r--ydb/core/fq/libs/actors/run_actor.cpp4
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp96
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp7
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h4
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
9 files changed, 11 insertions, 107 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp
index 669e09ec83..d28bf071e3 100644
--- a/ydb/core/fq/libs/actors/run_actor.cpp
+++ b/ydb/core/fq/libs/actors/run_actor.cpp
@@ -1781,10 +1781,6 @@ private:
*gatewaysConfig.MutableS3() = Params.Config.GetGateways().GetS3();
gatewaysConfig.MutableS3()->ClearClusterMapping();
- auto* attr = gatewaysConfig.MutableS3()->MutableDefaultSettings()->Add();
- attr->SetName("ArrowThreadPool");
- attr->SetValue("false");
-
THashMap<TString, TString> clusters;
TString monitoringEndpoint = Params.Config.GetCommon().GetMonitoringEndpoint();
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 3dc9673e3e..6937eceb63 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -159,7 +159,6 @@ struct TEvPrivate {
EvNextRecordBatch,
EvFileFinished,
EvContinue,
- EvFutureResolved,
EvObjectPathBatch,
EvObjectPathReadError,
EvReadResult2,
@@ -192,10 +191,6 @@ struct TEvPrivate {
IHTTPGateway::TCountedContent Result;
};
- struct TEvFutureResolved : public TEventLocal<TEvFutureResolved, EvFutureResolved> {
- TEvFutureResolved() {}
- };
-
struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> {
TEvReadStarted(CURLcode curlResponseCode, long httpResponseCode)
: CurlResponseCode(curlResponseCode), HttpResponseCode(httpResponseCode) {}
@@ -999,7 +994,6 @@ struct TReadSpec {
using TPtr = std::shared_ptr<TReadSpec>;
bool Arrow = false;
- bool ThreadPool = false;
ui64 ParallelRowGroupCount = 0;
bool RowGroupReordering = true;
ui64 ParallelDownloadCount = 0;
@@ -1513,73 +1507,6 @@ public:
}
}
- void RunThreadPoolBlockArrowParser() {
-
- LOG_CORO_D("RunThreadPoolBlockArrowParser");
-
- TArrowFileDesc fileDesc(Url + Path, RetryStuff->Gateway, RetryStuff->Headers, RetryStuff->RetryPolicy, RetryStuff->SizeLimit, ReadSpec->Format);
-
- auto actorSystem = GetActorSystem();
- auto selfId = SelfActorId;
-
- auto future = ArrowReader->GetSchema(fileDesc);
- future.Subscribe([actorSystem, selfId](const NThreading::TFuture<IArrowReader::TSchemaResponse>&) {
- actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved()));
- });
-
- CpuTime += GetCpuTimeDelta();
-
- WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- auto result = future.GetValue();
-
- StartCycleCount = GetCycleCountFast();
-
- fileDesc.Cookie = result.Cookie;
-
- std::shared_ptr<arrow::Schema> schema = result.Schema;
- std::vector<int> columnIndices;
- std::vector<TColumnConverter> columnConverters;
-
- BuildColumnConverters(ReadSpec->ArrowSchema, schema, columnIndices, columnConverters);
-
- for (int group = 0; group < result.NumRowGroups; group++) {
-
- auto future = ArrowReader->ReadRowGroup(fileDesc, group, columnIndices);
- future.Subscribe([actorSystem, selfId](const NThreading::TFuture<std::shared_ptr<arrow::Table>>&){
- actorSystem->Send(new IEventHandle(selfId, selfId, new TEvPrivate::TEvFutureResolved()));
- });
-
- CpuTime += GetCpuTimeDelta();
-
- WaitForSpecificEvent<TEvPrivate::TEvFutureResolved>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- auto table = future.GetValue();
-
- StartCycleCount = GetCycleCountFast();
-
- auto reader = std::make_unique<arrow::TableBatchReader>(*table);
-
- std::shared_ptr<arrow::RecordBatch> batch;
- ::arrow::Status status;
- while (status = reader->ReadNext(&batch), status.ok() && batch) {
- auto convertedBatch = ConvertArrowColumns(batch, columnConverters);
- Paused = QueueBufferCounter->Add(NUdf::GetSizeOfArrowBatchInBytes(*convertedBatch), SelfActorId);
- Send(ParentActorId, new TEvPrivate::TEvNextRecordBatch(
- convertedBatch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()
- ));
- if (Paused) {
- CpuTime += GetCpuTimeDelta();
- auto ev = WaitForSpecificEvent<TEvPrivate::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
- HandleEvent(*ev);
- StartCycleCount = GetCycleCountFast();
- }
- }
- if (!status.ok()) {
- throw yexception() << status.ToString();
- }
- }
- LOG_CORO_D("RunThreadPoolBlockArrowParser FINISHED");
- }
-
struct TReadCache {
ui64 Cookie = 0;
TString Data;
@@ -2089,7 +2016,7 @@ private:
public:
TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId,
const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex,
- const TString& path, const TString& url, IArrowReader::TPtr arrowReader,
+ const TString& path, const TString& url,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
TReadBufferCounter::TPtr queueBufferCounter,
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
@@ -2098,7 +2025,7 @@ public:
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
- PathIndex(pathIndex), Path(path), Url(url), ArrowReader(arrowReader),
+ PathIndex(pathIndex), Path(path), Url(url),
QueueBufferCounter(queueBufferCounter),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {
@@ -2148,14 +2075,10 @@ private:
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
} else {
try {
- if (ReadSpec->ThreadPool) {
- RunThreadPoolBlockArrowParser();
+ if (Url.StartsWith("file://")) {
+ RunCoroBlockArrowParserOverFile();
} else {
- if (Url.StartsWith("file://")) {
- RunCoroBlockArrowParserOverFile();
- } else {
- RunCoroBlockArrowParserOverHttp();
- }
+ RunCoroBlockArrowParserOverHttp();
}
} catch (const parquet::ParquetException& ex) {
Issues.AddIssue(TIssue(ex.what()));
@@ -2258,7 +2181,6 @@ private:
std::size_t LastOffset = 0;
TString LastData;
- IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
TDuration CpuTime;
ui64 StartCycleCount = 0;
@@ -2300,7 +2222,6 @@ public:
const TReadSpec::TPtr& readSpec,
const NActors::TActorId& computeActorId,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
- IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
@@ -2320,7 +2241,6 @@ public:
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
, ReadSpec(readSpec)
- , ArrowReader(std::move(arrowReader))
, Counters(std::move(counters))
, TaskCounters(std::move(taskCounters))
, FileSizeLimit(fileSizeLimit) {
@@ -2439,7 +2359,6 @@ public:
pathIndex,
objectPath.Path,
Url,
- ArrowReader,
ReadActorFactoryCfg,
QueueBufferCounter,
DeferredQueueSize,
@@ -2749,7 +2668,6 @@ private:
size_t CompletedFiles = 0;
const TReadSpec::TPtr ReadSpec;
std::deque<TReadyBlock> Blocks;
- IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
TDuration CpuTime;
mutable TInstant LastMemoryReport = TInstant::Now();
@@ -2925,7 +2843,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
- IArrowReader::TPtr arrowReader,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters)
{
@@ -2987,7 +2904,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto readSpec = std::make_shared<TReadSpec>();
readSpec->Arrow = params.GetArrow();
- readSpec->ThreadPool = params.GetThreadPool();
readSpec->ParallelRowGroupCount = params.GetParallelRowGroupCount();
readSpec->RowGroupReordering = params.GetRowGroupReordering();
readSpec->ParallelDownloadCount = params.GetParallelDownloadCount();
@@ -3076,7 +2992,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
#undef SUPPORTED_FLAGS
const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy,
- arrowReader, cfg, counters, taskCounters, fileSizeLimit);
+ cfg, counters, taskCounters, fileSizeLimit);
return {actor, actor};
} else {
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 019a326a98..2b3e64b12a 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -5,7 +5,6 @@
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
#include <ydb/library/yql/providers/s3/proto/source.pb.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
-#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h>
#include <library/cpp/actors/core/actor.h>
namespace NYql::NDq {
@@ -25,7 +24,6 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
- IArrowReader::TPtr arrowReader,
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters);
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index 5eae61bc38..b84c033570 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -16,15 +16,14 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway,
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& cfg,
- ::NMonitoring::TDynamicCounterPtr counters,
- IArrowReader::TPtr arrowReader) {
+ ::NMonitoring::TDynamicCounterPtr counters) {
#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryPolicy, cfg, counters, arrowReader](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
+ [credentialsFactory, gateway, retryPolicy, cfg, counters](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway,
std::move(settings), args.InputIndex, args.TxId, args.SecureParams,
- args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg, arrowReader,
+ args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg,
counters, args.TaskCounters);
});
#else
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index c78ff01a98..89bfc5009c 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -6,7 +6,6 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
-#include <ydb/library/yql/providers/common/arrow/interface/arrow_reader.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <util/generic/size_literals.h>
@@ -28,7 +27,6 @@ void RegisterS3ReadActorFactory(
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
const TS3ReadActorFactoryConfig& = {},
- ::NMonitoring::TDynamicCounterPtr counters = nullptr,
- IArrowReader::TPtr arrowReader = MakeArrowReader(TArrowReaderSettings()));
+ ::NMonitoring::TDynamicCounterPtr counters = nullptr);
}
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index 426cf90f21..a14fcc91b0 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -16,7 +16,7 @@ message TSource {
optional string Format = 5;
map<string, string> Settings = 6;
bool Arrow = 7;
- bool ThreadPool = 8;
+ bool ThreadPool = 8; // deprecated
uint64 ParallelRowGroupCount = 9;
bool RowGroupReordering = 10;
uint64 ParallelDownloadCount = 11;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 5cf8f9d5b3..31628ec902 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -286,7 +286,6 @@ public:
const auto parseSettings = mayParseSettings.Cast();
srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
srcDesc.SetArrow(bool(parseSettings.Maybe<TS3ArrowSettings>()));
- srcDesc.SetThreadPool(State_->Configuration->ArrowThreadPool.Get().GetOrElse(true));
srcDesc.SetParallelRowGroupCount(State_->Configuration->ArrowParallelRowGroupCount.Get().GetOrElse(0));
srcDesc.SetRowGroupReordering(State_->Configuration->ArrowRowGroupReordering.Get().GetOrElse(true));
srcDesc.SetParallelDownloadCount(State_->Configuration->ParallelDownloadCount.Get().GetOrElse(0));
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 9734654fc7..af2346806b 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -15,7 +15,6 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, SerializeMemoryLimit);
REGISTER_SETTING(*this, InFlightMemoryLimit);
REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000);
- REGISTER_SETTING(*this, ArrowThreadPool);
REGISTER_SETTING(*this, ArrowParallelRowGroupCount).Lower(1);
REGISTER_SETTING(*this, ArrowRowGroupReordering);
REGISTER_SETTING(*this, ParallelDownloadCount);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 6a08fbd7b7..bce44b7844 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -17,7 +17,6 @@ struct TS3Settings {
NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions.
NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink.
NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000.
- NCommon::TConfSetting<bool, false> ArrowThreadPool;
NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1
NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK
NCommon::TConfSetting<ui64, false> ParallelDownloadCount; // Number of files to read in parallel, min == 1