aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-10-20 16:47:09 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-10-20 16:47:09 +0300
commit79a93b20094b7b8f0042e5cfef6af7d9894efeb7 (patch)
tree4b6712be124f964dd139d027e8c768d2f583e5f7
parentc50f614d69d440b320f6aa93288caa7802944195 (diff)
downloadydb-79a93b20094b7b8f0042e5cfef6af7d9894efeb7.tar.gz
RowsInBatch parameter for S3 read actor
-rw-r--r--ydb/core/yq/libs/config/protos/read_actors_factory.proto1
-rw-r--r--ydb/core/yq/libs/init/init.cpp9
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp26
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h5
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp7
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h7
6 files changed, 38 insertions, 17 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
index ff70a6dd528..337b639a6ce 100644
--- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto
+++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
@@ -10,6 +10,7 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto";
message TS3ReadActorFactoryConfig {
NYql.NS3.TRetryConfig RetryConfig = 1;
+ uint64 RowsInBatch = 2; // Default = 1000
}
message TPqReadActorFactoryConfig {
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index cca3261fec4..744828532ff 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -155,11 +155,16 @@ void Init(
}
if (protoConfig.GetPrivateApi().GetEnabled()) {
- auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one
+ const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
+ auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(TDuration::MilliSeconds(s3readConfig.GetRetryConfig().GetMaxRetryTimeMs())); // if MaxRetryTimeMs is not set, default http gateway will use the default one
+ NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg;
+ if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
+ readActorFactoryCfg.RowsInBatch = rowsInBatch;
+ }
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory,
- httpGateway, s3HttpRetryPolicy);
+ httpGateway, s3HttpRetryPolicy, readActorFactoryCfg);
RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, s3HttpRetryPolicy);
RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway);
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index c1af7c69d52..eb684ef5f07 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -23,6 +23,7 @@
#endif
#include "yql_s3_read_actor.h"
+#include "yql_s3_source_factory.h"
#include "yql_s3_actors_util.h"
#include <ydb/core/protos/services.pb.h>
@@ -417,8 +418,8 @@ private:
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
public:
- TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path, const TString& url, const std::size_t maxBlocksInFly)
- : TActorCoroImpl(256_KB), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly)
+ TS3ReadCoroImpl(ui64 inputIndex, const TTxId& txId, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path, const TString& url, const std::size_t maxBlocksInFly, const TS3ReadActorFactoryConfig& readActorFactoryCfg)
+ : TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex), TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path), Url(url), MaxBlocksInFly(maxBlocksInFly)
{}
bool Next(TString& value) {
@@ -518,7 +519,7 @@ private:
}
const auto decompress(MakeDecompressor(*buffer, ReadSpec->Compression));
YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
- NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings));
+ NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : *buffer, NDB::Block(ReadSpec->Columns), nullptr, ReadActorFactoryCfg.RowsInBatch, ReadSpec->Settings));
auto actorSystem = GetActorSystem();
auto selfActorId = SelfActorId;
size_t cntBlocksInFly = 0;
@@ -621,6 +622,7 @@ private:
}
private:
+ const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const ui64 InputIndex;
const TTxId TxId;
const TRetryStuff::TPtr RetryStuff;
@@ -679,8 +681,10 @@ public:
const TReadSpec::TPtr& readSpec,
const NActors::TActorId& computeActorId,
const IRetryPolicy<long>::TPtr& retryPolicy,
- const std::size_t maxBlocksInFly
- ) : Gateway(std::move(gateway))
+ const std::size_t maxBlocksInFly,
+ const TS3ReadActorFactoryConfig& readActorFactoryCfg
+ ) : ReadActorFactoryCfg(readActorFactoryCfg)
+ , Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
, TxId(txId)
@@ -700,11 +704,11 @@ public:
LOG_D("TS3StreamReadActor", "Bootstrap");
Become(&TS3StreamReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
-
+
const TPath& path = Paths[pathInd];
auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path), TxId, RetryPolicy);
RetryStuffForFile.push_back(stuff);
- auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly);
+ auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path), Url, MaxBlocksInFly, ReadActorFactoryCfg);
RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release());
}
}
@@ -820,6 +824,7 @@ private:
}
}
+ const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
std::vector<TRetryStuff::TPtr> RetryStuffForFile;
const THolderFactory& HolderFactory;
@@ -960,7 +965,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const IRetryPolicy<long>::TPtr& retryPolicy)
+ const IRetryPolicy<long>::TPtr& retryPolicy,
+ const TS3ReadActorFactoryConfig& cfg)
{
const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry();
@@ -1020,8 +1026,8 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it)
maxBlocksInFly = FromString<ui64>(it->second);
const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
- std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, maxBlocksInFly);
-
+ std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, maxBlocksInFly, cfg);
+
return {actor, actor};
} else {
ui64 sizeLimit = std::numeric_limits<ui64>::max();
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 099740d829f..4369abf6fac 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -9,6 +9,8 @@
namespace NYql::NDq {
+struct TS3ReadActorFactoryConfig;
+
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadActor(
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
@@ -20,6 +22,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateS3ReadA
const THashMap<TString, TString>& taskParams,
const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- const IRetryPolicy<long>::TPtr& retryPolicy);
+ const IRetryPolicy<long>::TPtr& retryPolicy,
+ const TS3ReadActorFactoryConfig& cfg);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index b795c701b31..1ba4fafc04d 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -14,12 +14,13 @@ void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway,
- const IRetryPolicy<long>::TPtr& retryPolicy) {
+ const IRetryPolicy<long>::TPtr& retryPolicy,
+ const TS3ReadActorFactoryConfig& cfg) {
#if defined(_linux_) || defined(_darwin_)
NDB::registerFormats();
factory.RegisterSource<NS3::TSource>("S3Source",
- [credentialsFactory, gateway, retryPolicy](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
- return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy);
+ [credentialsFactory, gateway, retryPolicy, cfg](NS3::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) {
+ return CreateS3ReadActor(args.TypeEnv, args.HolderFactory, gateway, std::move(settings), args.InputIndex, args.TxId, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryPolicy, cfg);
});
#else
Y_UNUSED(factory);
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 83eb05d427e..7ee8b1d4c59 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -11,10 +11,15 @@
namespace NYql::NDq {
+struct TS3ReadActorFactoryConfig {
+ ui64 RowsInBatch = 1000;
+};
+
void RegisterS3ReadActorFactory(
TDqAsyncIoFactory& factory,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr gateway = IHTTPGateway::Make(),
- const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy());
+ const IRetryPolicy<long>::TPtr& retryPolicy = GetHTTPDefaultRetryPolicy(),
+ const TS3ReadActorFactoryConfig& = {});
}