diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-18 17:38:09 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-18 17:38:09 +0300 |
commit | 47a67a837880704fa43a9f06152f6e0127cc285b (patch) | |
tree | da547f5fcd1b54479c65521088c3da0ef48e7080 | |
parent | 30443074f4257ff168e18950a5e9ddf04c8d049d (diff) | |
download | ydb-47a67a837880704fa43a9f06152f6e0127cc285b.tar.gz |
YQ-888 RangesMode setting in YQ config
RangesMode setting in YQ config
ref:e84905dff4de7444fd175aee34c64978e7c747d1
4 files changed, 24 insertions, 10 deletions
diff --git a/ydb/core/yq/libs/config/protos/read_actors_factory.proto b/ydb/core/yq/libs/config/protos/read_actors_factory.proto index 6addaab49f0..ff70a6dd528 100644 --- a/ydb/core/yq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/yq/libs/config/protos/read_actors_factory.proto @@ -12,6 +12,11 @@ message TS3ReadActorFactoryConfig { NYql.NS3.TRetryConfig RetryConfig = 1; } +message TPqReadActorFactoryConfig { + bool CookieCommitMode = 1; // Turn off RangesMode setting in PQ read session. +} + message TReadActorsFactoryConfig { TS3ReadActorFactoryConfig S3ReadActorFactoryConfig = 1; + TPqReadActorFactoryConfig PqReadActorFactoryConfig = 2; } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 4faa00b74aa..0e2e24a05ae 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -178,7 +178,7 @@ void Init( if (protoConfig.GetTokenAccessor().GetEnabled()) { credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(protoConfig.GetTokenAccessor().GetEndpoint(), protoConfig.GetTokenAccessor().GetUseSsl()); - RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory); + RegisterDqPqReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*sourceActorFactory, yqSharedResources->YdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*sourceActorFactory, credentialsFactory, httpGateway, std::make_shared<NYql::NS3::TRetryConfig>(protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetRetryConfig())); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index e26956545e9..e175c385392 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -100,11 +100,13 @@ public: NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, ICallbacks* callbacks, - i64 bufferSize) + i64 bufferSize, + bool rangesMode) : TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc) , InputIndex(inputIndex) , TxId(txId) , BufferSize(bufferSize) + , RangesMode(rangesMode) , HolderFactory(holderFactory) , Driver(std::move(driver)) , CredentialsProviderFactory(std::move(credentialsProviderFactory)) @@ -273,7 +275,8 @@ private: .AppendTopics(topicReadSettings) .ConsumerName(SourceParams.GetConsumerName()) .MaxMemoryUsageBytes(BufferSize) - .StartingMessageTimestamp(StartingMessageTimestamp); + .StartingMessageTimestamp(StartingMessageTimestamp) + .RangesMode(RangesMode); } void UpdateStateWithNewReadData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) { @@ -350,6 +353,7 @@ private: const ui64 InputIndex; const TString TxId; const i64 BufferSize; + const bool RangesMode; const THolderFactory& HolderFactory; NYdb::TDriver Driver; std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory; @@ -376,7 +380,8 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IDqSourceActor::ICallbacks* callbacks, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - i64 bufferSize + i64 bufferSize, + bool rangesMode ) { auto taskParamsIt = taskParams.find("pq"); @@ -398,15 +403,16 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor( std::move(driver), CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken), callbacks, - bufferSize + bufferSize, + rangesMode ); return {actor, actor}; } -void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { +void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) { factory.Register<NPq::NProto::TDqPqTopicSource>("PqSource", - [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)]( + [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode]( NPq::NProto::TDqPqTopicSource&& settings, IDqSourceActorFactory::TArguments&& args) { @@ -420,7 +426,9 @@ void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver drive driver, credentialsFactory, args.Callback, - args.HolderFactory); + args.HolderFactory, + PQReadDefaultFreeSpace, + rangesMode); }); } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index d7e2c82dd04..19d60503f2b 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -32,9 +32,10 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IDqSourceActor::ICallbacks* callback, const NKikimr::NMiniKQL::THolderFactory& holderFactory, - i64 bufferSize = PQReadDefaultFreeSpace + i64 bufferSize = PQReadDefaultFreeSpace, + bool rangesMode = true ); -void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); +void RegisterDqPqReadActorFactory(TDqSourceFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true); } // namespace NYql::NDq |