diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-10-20 16:47:09 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-10-20 16:47:09 +0300 |
commit | 79a93b20094b7b8f0042e5cfef6af7d9894efeb7 (patch) | |
tree | 4b6712be124f964dd139d027e8c768d2f583e5f7 | |
parent | c50f614d69d440b320f6aa93288caa7802944195 (diff) | |
download | ydb-79a93b20094b7b8f0042e5cfef6af7d9894efeb7.tar.gz |
RowsInBatch parameter for S3 read actor
6 files changed, 38 insertions, 17 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto index ff70a6dd528..337b639a6ce 100644 --- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto @@ -10,6 +10,7 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto"; message TS3ReadActorFactoryConfig { NYql.NS3.TRetryConfig RetryConfig = 1; + uint64 RowsInBatch = 2; // Default = 1000 } message TPqReadActorFactoryConfig { diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index cca3261fec4..744828532ff 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -155,11 +155,16 @@ void Init( } if (protoConfig.GetPrivateApi().GetEnabled()) { - auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one + const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig(); + auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(s3readConfig.GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one + NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg; + if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) { + readActorFactoryCfg.RowsInBatch = rowsInBatch; + } RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, - httpGateway, s3HttpRetryPolicy); + httpGateway, s3HttpRetryPolicy, readActorFactoryCfg); RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy); RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index c1af7c69d52..eb684ef5f07 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -23,6 +23,7 @@ #endif #include "yql_s3_read_actor.h" +#include "yql_s3_source_factory.h" #include "yql_s3_actors_util.h" #include <ydb/core/protos/services.pb.h> @@ -417,8 +418,8 @@ private: static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; public: - TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path, const TString& url, const std::size_t maxBlocksInFly) - : TActorCoroImpl(256_KB), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly) + TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path, const TString& url, const std::size_t maxBlocksInFly, const TS3ReadActorFactoryConfig& readActorFactoryCfg) + : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly) {} bool Next(TString& value) { @@ -518,7 +519,7 @@ private: } const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression)); YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); - NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); + NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings)); auto actorSystem = GetActorSystem(); auto selfActorId = SelfActorId; size_t cntBlocksInFly = 0; @@ -621,6 +622,7 @@ private: } private: + const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const ui64 InputIndex; const TTxId TxId; const TRetryStuff::TPtr RetryStuff; @@ -679,8 +681,10 @@ public: const TReadSpec::TPtr& readSpec, const NActors::TActorId& computeActorId, const IRetryPolicy<long>::TPtr& retryPolicy, - const std::size_t maxBlocksInFly - ) : Gateway(std::move(gateway)) + const std::size_t maxBlocksInFly, + const TS3ReadActorFactoryConfig& readActorFactoryCfg + ) : ReadActorFactoryCfg(readActorFactoryCfg) + , Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) , TxId(txId) @@ -700,11 +704,11 @@ public: LOG_D("TS3StreamReadActor", "Bootstrap"); Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { - + const TPath& path = Paths[pathInd]; auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy); RetryStuffForFile.push_back(stuff); - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly); + auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg); RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); } } @@ -820,6 +824,7 @@ private: } } + const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; std::vector<TRetryStuff::TPtr> RetryStuffForFile; const THolderFactory& HolderFactory; @@ -960,7 +965,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const IRetryPolicy<long>::TPtr& retryPolicy) + const IRetryPolicy<long>::TPtr& retryPolicy, + const TS3ReadActorFactoryConfig& cfg) { const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); @@ -1020,8 +1026,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it) maxBlocksInFly = FromString<ui64>(it->second); const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, - std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, maxBlocksInFly); - + std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, maxBlocksInFly, cfg); + return {actor, actor}; } else { ui64 sizeLimit = std::numeric_limits<ui64>::max(); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 099740d829f..4369abf6fac 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -9,6 +9,8 @@ namespace NYql::NDq { +struct TS3ReadActorFactoryConfig; + std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor( const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, @@ -20,6 +22,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const IRetryPolicy<long>::TPtr& retryPolicy); + const IRetryPolicy<long>::TPtr& retryPolicy, + const TS3ReadActorFactoryConfig& cfg); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index b795c701b31..1ba4fafc04d 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -14,12 +14,13 @@ void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, - const IRetryPolicy<long>::TPtr& retryPolicy) { + const IRetryPolicy<long>::TPtr& retryPolicy, + const TS3ReadActorFactoryConfig& cfg) { #if defined(_linux_) || defined(_darwin_) NDB::registerFormats(); factory.RegisterSource<NS3::TSource>("S3Source", - [credentialsFactory, gateway, retryPolicy](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { - return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy); + [credentialsFactory, gateway, retryPolicy, cfg](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { + return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg); }); #else Y_UNUSED(factory); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 83eb05d427e..7ee8b1d4c59 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -11,10 +11,15 @@ namespace NYql::NDq { +struct TS3ReadActorFactoryConfig { + ui64 RowsInBatch = 1000; +}; + void RegisterS3ReadActorFactory( TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway = IHTTPGateway::Make(), - const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy()); + const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(), + const TS3ReadActorFactoryConfig& = {}); } |