aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-18 17:38:09 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-18 17:38:09 +0300
commit47a67a837880704fa43a9f06152f6e0127cc285b (patch)
treeda547f5fcd1b54479c65521088c3da0ef48e7080
parent30443074f4257ff168e18950a5e9ddf04c8d049d (diff)
downloadydb-47a67a837880704fa43a9f06152f6e0127cc285b.tar.gz
YQ-888 RangesMode setting in YQ config
RangesMode setting in YQ config ref:e84905dff4de7444fd175aee34c64978e7c747d1
-rw-r--r--ydb/core/yq/libs/config/protos/read_actors_factory.proto5
-rw-r--r--ydb/core/yq/libs/init/init.cpp2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp22
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h5
4 files changed, 24 insertions, 10 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
index 6addaab49f0..ff70a6dd528 100644
--- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto
+++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto
@@ -12,6 +12,11 @@ message TS3ReadActorFactoryConfig {
NYql.NS3.TRetryConfig RetryConfig = 1;
}
+message TPqReadActorFactoryConfig {
+ bool CookieCommitMode = 1; // Turn off RangesMode setting in PQ read session.
+}
+
message TReadActorsFactoryConfig {
TS3ReadActorFactoryConfig S3ReadActorFactoryConfig = 1;
+ TPqReadActorFactoryConfig PqReadActorFactoryConfig = 2;
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 4faa00b74aa..0e2e24a05ae 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -178,7 +178,7 @@ void Init(
if (protoConfig.GetTokenAccessor().GetEnabled()) {
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(protoConfig.GetTokenAccessor().GetEndpoint(), protoConfig.GetTokenAccessor().GetUseSsl());
- RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory);
+ RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory,
httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig()));
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index e26956545e9..e175c385392 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -100,11 +100,13 @@ public:
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
ICallbacks* callbacks,
- i64 bufferSize)
+ i64 bufferSize,
+ bool rangesMode)
: TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
, InputIndex(inputIndex)
, TxId(txId)
, BufferSize(bufferSize)
+ , RangesMode(rangesMode)
, HolderFactory(holderFactory)
, Driver(std::move(driver))
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
@@ -273,7 +275,8 @@ private:
.AppendTopics(topicReadSettings)
.ConsumerName(SourceParams.GetConsumerName())
.MaxMemoryUsageBytes(BufferSize)
- .StartingMessageTimestamp(StartingMessageTimestamp);
+ .StartingMessageTimestamp(StartingMessageTimestamp)
+ .RangesMode(RangesMode);
}
void UpdateStateWithNewReadData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) {
@@ -350,6 +353,7 @@ private:
const ui64 InputIndex;
const TString TxId;
const i64 BufferSize;
+ const bool RangesMode;
const THolderFactory& HolderFactory;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
@@ -376,7 +380,8 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IDqSourceActor::ICallbacks* callbacks,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- i64 bufferSize
+ i64 bufferSize,
+ bool rangesMode
)
{
auto taskParamsIt = taskParams.find("pq");
@@ -398,15 +403,16 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor(
std::move(driver),
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
callbacks,
- bufferSize
+ bufferSize,
+ rangesMode
);
return {actor, actor};
}
-void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
+void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) {
factory.Register<NPq::NProto::TDqPqTopicSource>("PqSource",
- [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)](
+ [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode](
NPq::NProto::TDqPqTopicSource&& settings,
IDqSourceActorFactory::TArguments&& args)
{
@@ -420,7 +426,9 @@ void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver drive
driver,
credentialsFactory,
args.Callback,
- args.HolderFactory);
+ args.HolderFactory,
+ PQReadDefaultFreeSpace,
+ rangesMode);
});
}
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
index d7e2c82dd04..19d60503f2b 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
@@ -32,9 +32,10 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IDqSourceActor::ICallbacks* callback,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
- i64 bufferSize = PQReadDefaultFreeSpace
+ i64 bufferSize = PQReadDefaultFreeSpace,
+ bool rangesMode = true
);
-void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true);
} // namespace NYql::NDq