diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-10-08 12:45:24 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-08 12:45:24 +0300 |
commit | dc9f4054d9ad170f88f3099c78caf3d58d095781 (patch) | |
tree | 75355778f33e62b301c258660a38addd6b56fa6f | |
parent | 61ddf6605c658d98ebd0f1e40b51418f0edce694 (diff) | |
download | ydb-dc9f4054d9ad170f88f3099c78caf3d58d095781.tar.gz |
YQ-3560 RowDispatcher: local mode to use in dqrun (#10072)
10 files changed, 28 insertions, 20 deletions
diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 10ca10285e..26e4ecbfc7 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -11,6 +11,7 @@ import "ydb/core/fq/libs/config/protos/storage.proto"; message TRowDispatcherCoordinatorConfig { TYdbStorageConfig Database = 1; string CoordinationNodePath = 2; + bool LocalMode = 3; // Use only local row_dispatcher. } message TRowDispatcherConfig { bool Enabled = 1; @@ -19,5 +20,4 @@ message TRowDispatcherConfig { uint64 MaxSessionUsedMemory = 4; bool WithoutConsumer = 5; TRowDispatcherCoordinatorConfig Coordinator = 6; - } diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 02d3d97652..343ed76781 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -192,7 +192,6 @@ void Init( if (protoConfig.GetRowDispatcher().GetEnabled()) { auto rowDispatcher = NFq::NewRowDispatcherService( protoConfig.GetRowDispatcher(), - protoConfig.GetCommon(), NKikimr::CreateYdbCredentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp index 6817cfc292..5f945ddc9f 100644 --- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp +++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp @@ -222,6 +222,10 @@ void TLeaderElection::Bootstrap() { Become(&TLeaderElection::StateFunc); LogPrefix = "TLeaderElection " + SelfId().ToString() + " "; LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString()); + if (Config.GetLocalMode()) { + TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId)); + return; + } ProcessState(); } diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 3d327385cf..f1d8b93d9a 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -108,7 +108,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { NConfig::TRowDispatcherConfig Config; - NConfig::TCommonConfig CommonConfig; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; TYqSharedResources::TPtr YqSharedResources; TMaybe<TActorId> CoordinatorActorId; @@ -171,7 +170,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> { public: explicit TRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -234,7 +232,6 @@ public: TRowDispatcher::TRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -242,7 +239,6 @@ TRowDispatcher::TRowDispatcher( const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory, const ::NMonitoring::TDynamicCounterPtr& counters) : Config(config) - , CommonConfig(commonConfig) , CredentialsProviderFactory(credentialsProviderFactory) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) @@ -586,7 +582,6 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintState::TPtr&) { std::unique_ptr<NActors::IActor> NewRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -596,7 +591,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher( { return std::unique_ptr<NActors::IActor>(new TRowDispatcher( config, - commonConfig, credentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h index 54c3b1521a..ff71aab8bd 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h @@ -16,7 +16,6 @@ namespace NFq { std::unique_ptr<NActors::IActor> NewRowDispatcher( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp index 1300f419d7..e314da7001 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp @@ -11,7 +11,6 @@ using namespace NActors; std::unique_ptr<NActors::IActor> NewRowDispatcherService( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -20,7 +19,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService( { return NewRowDispatcher( config, - commonConfig, credentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h index ef8a9f2909..c3ee492c06 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h @@ -16,7 +16,6 @@ namespace NFq { std::unique_ptr<NActors::IActor> NewRowDispatcherService( const NConfig::TRowDispatcherConfig& config, - const NConfig::TCommonConfig& commonConfig, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, const TYqSharedResources::TPtr& yqSharedResources, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp index 93ccaa8c15..bdef440832 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp @@ -23,15 +23,18 @@ public: Runtime.Initialize(app->Unwrap()); Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory; - auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>())); + YqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>())); RowDispatcher = Runtime.AllocateEdgeActor(); Coordinator1 = Runtime.AllocateEdgeActor(); Coordinator2 = Runtime.AllocateEdgeActor(); Coordinator3 = Runtime.AllocateEdgeActor(); + } + void Init(bool localMode = false) { NConfig::TRowDispatcherCoordinatorConfig config; config.SetCoordinationNodePath("row_dispatcher"); + config.SetLocalMode(localMode); auto& database = *config.MutableDatabase(); database.SetEndpoint(GetEnv("YDB_ENDPOINT")); database.SetDatabase(GetEnv("YDB_DATABASE")); @@ -42,7 +45,7 @@ public: Coordinator1, config, NKikimr::CreateYdbCredentialsProviderFactory, - yqSharedResources, + YqSharedResources, "/tenant", MakeIntrusive<NMonitoring::TDynamicCounters>() ).release()); @@ -52,7 +55,7 @@ public: Coordinator2, config, NKikimr::CreateYdbCredentialsProviderFactory, - yqSharedResources, + YqSharedResources, "/tenant", MakeIntrusive<NMonitoring::TDynamicCounters>() ).release()); @@ -62,7 +65,7 @@ public: Coordinator3, config, NKikimr::CreateYdbCredentialsProviderFactory, - yqSharedResources, + YqSharedResources, "/tenant", MakeIntrusive<NMonitoring::TDynamicCounters>() ).release()); @@ -95,10 +98,12 @@ public: NActors::TActorId Coordinator2; NActors::TActorId Coordinator3; NActors::TActorId LeaderDetector; + TYqSharedResources::TPtr YqSharedResources; }; Y_UNIT_TEST_SUITE(LeaderElectionTests) { Y_UNIT_TEST_F(Test1, TFixture) { + Init(); auto coordinatorId1 = ExpectCoordinatorChanged(); auto coordinatorId2 = ExpectCoordinatorChanged(); @@ -134,7 +139,15 @@ Y_UNIT_TEST_SUITE(LeaderElectionTests) { auto coordinatorId6 = ExpectCoordinatorChanged(); UNIT_ASSERT(coordinatorId6 != coordinatorId4); } -} + Y_UNIT_TEST_F(TestLocalMode, TFixture) { + Init(true); + auto coordinatorId1 = ExpectCoordinatorChanged(); + auto coordinatorId2 = ExpectCoordinatorChanged(); + auto coordinatorId3 = ExpectCoordinatorChanged(); + TSet<NActors::TActorId> set {coordinatorId1, coordinatorId2, coordinatorId3}; + UNIT_ASSERT(set.size() == 3); + } } +} diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index f5641e8155..550c35447a 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -61,7 +61,6 @@ public: database.SetDatabase("YDB_DATABASE"); database.SetToken(""); - NConfig::TCommonConfig commonConfig; auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory; auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>())); @@ -74,7 +73,6 @@ public: RowDispatcher = Runtime.Register(NewRowDispatcher( config, - commonConfig, NKikimr::CreateYdbCredentialsProviderFactory, yqSharedResources, credentialsFactory, diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp index c882575dd1..55f284b8f8 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp @@ -78,7 +78,7 @@ private: class TFileTopicReadSession : public NYdb::NTopic::IReadSession { -constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100); +constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5); public: TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = ""): @@ -182,10 +182,14 @@ private: TString rawMsg; TVector<TMessage> msgs; size_t size = 0; + ui64 maxBatchRowSize = 100; while (size_t read = fi.ReadLine(rawMsg)) { msgs.emplace_back(MakeNextMessage(rawMsg)); MsgOffset_++; + if (!maxBatchRowSize--) { + break; + } size += rawMsg.size(); } if (!msgs.empty()) { |