aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-10-08 12:45:24 +0300
committerGitHub <noreply@github.com>2024-10-08 12:45:24 +0300
commitdc9f4054d9ad170f88f3099c78caf3d58d095781 (patch)
tree75355778f33e62b301c258660a38addd6b56fa6f
parent61ddf6605c658d98ebd0f1e40b51418f0edce694 (diff)
downloadydb-dc9f4054d9ad170f88f3099c78caf3d58d095781.tar.gz
YQ-3560 RowDispatcher: local mode to use in dqrun (#10072)
-rw-r--r--ydb/core/fq/libs/config/protos/row_dispatcher.proto2
-rw-r--r--ydb/core/fq/libs/init/init.cpp1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/leader_election.cpp4
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp6
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp2
-rw-r--r--ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp23
-rw-r--r--ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp2
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp6
10 files changed, 28 insertions, 20 deletions
diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto
index 10ca10285e..26e4ecbfc7 100644
--- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto
+++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto
@@ -11,6 +11,7 @@ import "ydb/core/fq/libs/config/protos/storage.proto";
message TRowDispatcherCoordinatorConfig {
TYdbStorageConfig Database = 1;
string CoordinationNodePath = 2;
+ bool LocalMode = 3; // Use only local row_dispatcher.
}
message TRowDispatcherConfig {
bool Enabled = 1;
@@ -19,5 +20,4 @@ message TRowDispatcherConfig {
uint64 MaxSessionUsedMemory = 4;
bool WithoutConsumer = 5;
TRowDispatcherCoordinatorConfig Coordinator = 6;
-
}
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index 02d3d97652..343ed76781 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -192,7 +192,6 @@ void Init(
if (protoConfig.GetRowDispatcher().GetEnabled()) {
auto rowDispatcher = NFq::NewRowDispatcherService(
protoConfig.GetRowDispatcher(),
- protoConfig.GetCommon(),
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
index 6817cfc292..5f945ddc9f 100644
--- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
@@ -222,6 +222,10 @@ void TLeaderElection::Bootstrap() {
Become(&TLeaderElection::StateFunc);
LogPrefix = "TLeaderElection " + SelfId().ToString() + " ";
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString());
+ if (Config.GetLocalMode()) {
+ TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId));
+ return;
+ }
ProcessState();
}
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
index 3d327385cf..f1d8b93d9a 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
@@ -108,7 +108,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
NConfig::TRowDispatcherConfig Config;
- NConfig::TCommonConfig CommonConfig;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
@@ -171,7 +170,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
public:
explicit TRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
- const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -234,7 +232,6 @@ public:
TRowDispatcher::TRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
- const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -242,7 +239,6 @@ TRowDispatcher::TRowDispatcher(
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters)
: Config(config)
- , CommonConfig(commonConfig)
, CredentialsProviderFactory(credentialsProviderFactory)
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
@@ -586,7 +582,6 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintState::TPtr&) {
std::unique_ptr<NActors::IActor> NewRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
- const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -596,7 +591,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
{
return std::unique_ptr<NActors::IActor>(new TRowDispatcher(
config,
- commonConfig,
credentialsProviderFactory,
yqSharedResources,
credentialsFactory,
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
index 54c3b1521a..ff71aab8bd 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
@@ -16,7 +16,6 @@ namespace NFq {
std::unique_ptr<NActors::IActor> NewRowDispatcher(
const NConfig::TRowDispatcherConfig& config,
- const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
index 1300f419d7..e314da7001 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
@@ -11,7 +11,6 @@ using namespace NActors;
std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const NConfig::TRowDispatcherConfig& config,
- const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -20,7 +19,6 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
{
return NewRowDispatcher(
config,
- commonConfig,
credentialsProviderFactory,
yqSharedResources,
credentialsFactory,
diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
index ef8a9f2909..c3ee492c06 100644
--- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
+++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
@@ -16,7 +16,6 @@ namespace NFq {
std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const NConfig::TRowDispatcherConfig& config,
- const NConfig::TCommonConfig& commonConfig,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
const TYqSharedResources::TPtr& yqSharedResources,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp
index 93ccaa8c15..bdef440832 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/leader_election_ut.cpp
@@ -23,15 +23,18 @@ public:
Runtime.Initialize(app->Unwrap());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
- auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
+ YqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
RowDispatcher = Runtime.AllocateEdgeActor();
Coordinator1 = Runtime.AllocateEdgeActor();
Coordinator2 = Runtime.AllocateEdgeActor();
Coordinator3 = Runtime.AllocateEdgeActor();
+ }
+ void Init(bool localMode = false) {
NConfig::TRowDispatcherCoordinatorConfig config;
config.SetCoordinationNodePath("row_dispatcher");
+ config.SetLocalMode(localMode);
auto& database = *config.MutableDatabase();
database.SetEndpoint(GetEnv("YDB_ENDPOINT"));
database.SetDatabase(GetEnv("YDB_DATABASE"));
@@ -42,7 +45,7 @@ public:
Coordinator1,
config,
NKikimr::CreateYdbCredentialsProviderFactory,
- yqSharedResources,
+ YqSharedResources,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>()
).release());
@@ -52,7 +55,7 @@ public:
Coordinator2,
config,
NKikimr::CreateYdbCredentialsProviderFactory,
- yqSharedResources,
+ YqSharedResources,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>()
).release());
@@ -62,7 +65,7 @@ public:
Coordinator3,
config,
NKikimr::CreateYdbCredentialsProviderFactory,
- yqSharedResources,
+ YqSharedResources,
"/tenant",
MakeIntrusive<NMonitoring::TDynamicCounters>()
).release());
@@ -95,10 +98,12 @@ public:
NActors::TActorId Coordinator2;
NActors::TActorId Coordinator3;
NActors::TActorId LeaderDetector;
+ TYqSharedResources::TPtr YqSharedResources;
};
Y_UNIT_TEST_SUITE(LeaderElectionTests) {
Y_UNIT_TEST_F(Test1, TFixture) {
+ Init();
auto coordinatorId1 = ExpectCoordinatorChanged();
auto coordinatorId2 = ExpectCoordinatorChanged();
@@ -134,7 +139,15 @@ Y_UNIT_TEST_SUITE(LeaderElectionTests) {
auto coordinatorId6 = ExpectCoordinatorChanged();
UNIT_ASSERT(coordinatorId6 != coordinatorId4);
}
-}
+ Y_UNIT_TEST_F(TestLocalMode, TFixture) {
+ Init(true);
+ auto coordinatorId1 = ExpectCoordinatorChanged();
+ auto coordinatorId2 = ExpectCoordinatorChanged();
+ auto coordinatorId3 = ExpectCoordinatorChanged();
+ TSet<NActors::TActorId> set {coordinatorId1, coordinatorId2, coordinatorId3};
+ UNIT_ASSERT(set.size() == 3);
+ }
}
+}
diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
index f5641e8155..550c35447a 100644
--- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
@@ -61,7 +61,6 @@ public:
database.SetDatabase("YDB_DATABASE");
database.SetToken("");
- NConfig::TCommonConfig commonConfig;
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
@@ -74,7 +73,6 @@ public:
RowDispatcher = Runtime.Register(NewRowDispatcher(
config,
- commonConfig,
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp
index c882575dd1..55f284b8f8 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp
@@ -78,7 +78,7 @@ private:
class TFileTopicReadSession : public NYdb::NTopic::IReadSession {
-constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(100);
+constexpr static auto FILE_POLL_PERIOD = TDuration::MilliSeconds(5);
public:
TFileTopicReadSession(TFile file, NYdb::NTopic::TPartitionSession::TPtr session, const TString& producerId = ""):
@@ -182,10 +182,14 @@ private:
TString rawMsg;
TVector<TMessage> msgs;
size_t size = 0;
+ ui64 maxBatchRowSize = 100;
while (size_t read = fi.ReadLine(rawMsg)) {
msgs.emplace_back(MakeNextMessage(rawMsg));
MsgOffset_++;
+ if (!maxBatchRowSize--) {
+ break;
+ }
size += rawMsg.size();
}
if (!msgs.empty()) {