aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-07-06 11:32:40 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-07-06 11:32:40 +0300
commit06cc85228c999258c5349c0e5b283b3d53892247 (patch)
tree7ab5506e8f49165beda8490d14ceb90bb5b62f55
parent0d96d627de2058ac0fe76e3d358b4e3617629f2c (diff)
downloadydb-06cc85228c999258c5349c0e5b283b3d53892247.tar.gz
CheckClustersList after InitDCs
-rw-r--r--ydb/core/persqueue/cluster_tracker.cpp2
-rw-r--r--ydb/core/testlib/test_pq_client.h34
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h1
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp46
-rw-r--r--ydb/services/persqueue_v1/ut/persqueue_test_fixture.h3
-rw-r--r--ydb/services/persqueue_v1/ut/test_utils.h1
8 files changed, 47 insertions, 45 deletions
diff --git a/ydb/core/persqueue/cluster_tracker.cpp b/ydb/core/persqueue/cluster_tracker.cpp
index 4e83eb060f..086a2a4334 100644
--- a/ydb/core/persqueue/cluster_tracker.cpp
+++ b/ydb/core/persqueue/cluster_tracker.cpp
@@ -90,6 +90,8 @@ private:
}
void SendClustersList(TActorId subscriberId) {
+ LOG_DEBUG_S(Ctx(), NKikimrServices::PQ_METACACHE, "SendClustersList");
+
auto ev = MakeHolder<TEvClusterTracker::TEvClustersUpdate>();
ev->ClustersList = ClustersList;
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index a07f4c03c7..bd487479f8 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -2,6 +2,7 @@
#include "test_client.h"
#include <ydb/core/client/flat_ut_client.h>
+#include <ydb/core/persqueue/cluster_tracker.h>
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/core/mind/address_classification/net_classifier.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
@@ -671,6 +672,39 @@ public:
)___");
}
+ void CheckClustersList(TTestActorRuntime* runtime, bool waitForUpdate = true, THashMap<TString, TPQTestClusterInfo> clusters = DEFAULT_CLUSTERS_LIST) {
+ UNIT_ASSERT(runtime != nullptr);
+
+ auto compareInfo = [](const TString& name, const TPQTestClusterInfo& info, const NPQ::NClusterTracker::TClustersList::TCluster& trackerInfo) {
+ UNIT_ASSERT_EQUAL(name, trackerInfo.Name);
+ UNIT_ASSERT_EQUAL(name, trackerInfo.Datacenter);
+ UNIT_ASSERT_EQUAL(info.Balancer, trackerInfo.Balancer);
+ UNIT_ASSERT_EQUAL(info.Enabled, trackerInfo.IsEnabled);
+ UNIT_ASSERT_EQUAL(info.Weight, trackerInfo.Weight);
+ };
+
+ TInstant now = TInstant::Now();
+
+ auto edgeActor = runtime->AllocateEdgeActor();
+
+ while (true) {
+ Cerr << "=== CheckClustersList\n";
+ runtime->Send(new IEventHandle(NKikimr::NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActor, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe));
+ auto trackerResponse = runtime->GrabEdgeEvent<NKikimr::NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
+
+ if (!waitForUpdate || trackerResponse->ClustersListUpdateTimestamp && trackerResponse->ClustersListUpdateTimestamp.GetRef() >= now + TDuration::Seconds(5)) {
+ for (auto& clusterInfo : trackerResponse->ClustersList->Clusters) {
+ auto it = clusters.find(clusterInfo.Name);
+ UNIT_ASSERT(it != clusters.end());
+ compareInfo(it->first, it->second, clusterInfo);
+ }
+ Cerr << "=== CheckClustersList Ok\n";
+ break;
+ }
+ Sleep(TDuration::MilliSeconds(100));
+ }
+ }
+
void UpdateDcEnabled(const TString& name, bool enabled) {
TStringBuilder query;
query << "UPDATE `/Root/PQ/Config/V2/Cluster` SET enabled = " << (enabled ? "true" : "false")
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index 5801a006b1..7fb6e751e3 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -71,8 +71,10 @@ public:
dataCenters.emplace("dc2", NKikimr::NPersQueueTests::TPQTestClusterInfo{"dc2.logbroker.yandex.net", false});
}
Server.AnnoyingClient->InitDCs(dataCenters);
+ Server.AnnoyingClient->CheckClustersList(Server.CleverServer->GetRuntime(), true, dataCenters);
} else {
Server.AnnoyingClient->InitDCs(DataCenters, LocalDC);
+ Server.AnnoyingClient->CheckClustersList(Server.CleverServer->GetRuntime(), true, DataCenters);
}
Server.AnnoyingClient->InitSourceIds();
CreateTopic(GetTestTopic(), GetLocalCluster());
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp
index cfe4f984ea..673d564aae 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp
@@ -7,8 +7,7 @@ const TVector<NKikimrServices::EServiceKikimr> TTestServer::LOGGED_SERVICES = {
NKikimrServices::PQ_WRITE_PROXY,
NKikimrServices::PQ_MIRRORER,
NKikimrServices::PQ_METACACHE,
- NKikimrServices::PERSQUEUE,
- NKikimrServices::FLAT_TX_SCHEMESHARD
+ NKikimrServices::PERSQUEUE
};
} // namespace NPersQueue
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
index a71da8cccc..0486f4bb64 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
@@ -50,6 +50,7 @@ public:
AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort, databaseName);
if (doClientInit) {
AnnoyingClient->FullInit();
+ AnnoyingClient->CheckClustersList(CleverServer->GetRuntime());
}
}
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 091bd0685c..b01b3dfde8 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -5929,57 +5929,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Y_UNIT_TEST(TClusterTrackerTest) {
APITestSetup setup{TEST_CASE_NAME};
setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
- const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor();
THashMap<TString, TPQTestClusterInfo> clusters = DEFAULT_CLUSTERS_LIST;
- auto compareInfo = [](const TString& name, const TPQTestClusterInfo& info, const NPQ::NClusterTracker::TClustersList::TCluster& trackerInfo) {
- UNIT_ASSERT_EQUAL(name, trackerInfo.Name);
- UNIT_ASSERT_EQUAL(name, trackerInfo.Datacenter);
- UNIT_ASSERT_EQUAL(info.Balancer, trackerInfo.Balancer);
- UNIT_ASSERT_EQUAL(info.Enabled, trackerInfo.IsEnabled);
- UNIT_ASSERT_EQUAL(info.Weight, trackerInfo.Weight);
- };
-
- auto getClustersFromTracker = [&]() {
- setup.GetServer().GetRuntime()->Send(new IEventHandle(
- NPQ::NClusterTracker::MakeClusterTrackerID(),
- edgeActorID,
- new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe
- ));
- return setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>();
- };
-
-
- {
- auto trackerResponce = getClustersFromTracker();
- for (auto& clusterInfo : trackerResponce->ClustersList->Clusters) {
- auto it = clusters.find(clusterInfo.Name);
- UNIT_ASSERT(it != clusters.end());
- compareInfo(it->first, it->second, clusterInfo);
- }
- }
+ auto runtime = setup.GetServer().GetRuntime();
+ setup.GetFlatMsgBusPQClient().CheckClustersList(runtime, false, clusters);
UNIT_ASSERT_EQUAL(clusters.count("dc1"), 1);
UNIT_ASSERT_EQUAL(clusters.count("dc2"), 1);
clusters["dc1"].Weight = 666;
clusters["dc2"].Balancer = "newbalancer.net";
setup.GetFlatMsgBusPQClient().InitDCs(clusters);
- TInstant updateTime = TInstant::Now();
-
- while (true) {
- auto trackerResponce = getClustersFromTracker();
- if (trackerResponce->ClustersListUpdateTimestamp) {
- if (trackerResponce->ClustersListUpdateTimestamp.GetRef() >= updateTime + TDuration::Seconds(5)) {
- for (auto& clusterInfo : trackerResponce->ClustersList->Clusters) {
- auto it = clusters.find(clusterInfo.Name);
- UNIT_ASSERT(it != clusters.end());
- compareInfo(it->first, it->second, clusterInfo);
- }
- break;
- }
- }
- Sleep(TDuration::MilliSeconds(100));
- }
+ setup.GetFlatMsgBusPQClient().CheckClustersList(runtime, true, clusters);
}
Y_UNIT_TEST(TestReadPartitionByGroupId) {
diff --git a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h
index 3cf0887007..3d23cfd90b 100644
--- a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h
+++ b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h
@@ -87,6 +87,9 @@ static void ModifyTopicACL(NYdb::TDriver* driver, const TString& topic, const TV
Server->AnnoyingClient->FullInit();
+ if (!TenantModeEnabled())
+ Server->AnnoyingClient->CheckClustersList(Server->CleverServer->GetRuntime());
+
Server->AnnoyingClient->CreateConsumer("user");
if (TenantModeEnabled()) {
Cerr << "=== Will create fst-class topics\n";
diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h
index ac4645f8b7..1ce0ee0bb2 100644
--- a/ydb/services/persqueue_v1/ut/test_utils.h
+++ b/ydb/services/persqueue_v1/ut/test_utils.h
@@ -29,6 +29,7 @@ using namespace NKikimr::NPersQueueTests;
server.EnableGRpc(NGrpc::TServerOptions().SetHost("localhost").SetPort(grpc));\
TFlatMsgBusPQClient client(settings, grpc);\
client.FullInit();\
+ client.CheckClustersList(server->GetRuntime());\
client.CreateTopic("rt3.dc1--" + TString(topic), 1);\
EnableLogs(server, { NKikimrServices::PQ_WRITE_PROXY });\
TPQDataWriter writer(messageGroupId, grpc, client, server.GetRuntime());\