diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-07-06 11:32:40 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-07-06 11:32:40 +0300 |
commit | 06cc85228c999258c5349c0e5b283b3d53892247 (patch) | |
tree | 7ab5506e8f49165beda8490d14ceb90bb5b62f55 | |
parent | 0d96d627de2058ac0fe76e3d358b4e3617629f2c (diff) | |
download | ydb-06cc85228c999258c5349c0e5b283b3d53892247.tar.gz |
CheckClustersList after InitDCs
8 files changed, 47 insertions, 45 deletions
diff --git a/ydb/core/persqueue/cluster_tracker.cpp b/ydb/core/persqueue/cluster_tracker.cpp index 4e83eb060f..086a2a4334 100644 --- a/ydb/core/persqueue/cluster_tracker.cpp +++ b/ydb/core/persqueue/cluster_tracker.cpp @@ -90,6 +90,8 @@ private: } void SendClustersList(TActorId subscriberId) { + LOG_DEBUG_S(Ctx(), NKikimrServices::PQ_METACACHE, "SendClustersList"); + auto ev = MakeHolder<TEvClusterTracker::TEvClustersUpdate>(); ev->ClustersList = ClustersList; diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index a07f4c03c7..bd487479f8 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -2,6 +2,7 @@ #include "test_client.h" #include <ydb/core/client/flat_ut_client.h> +#include <ydb/core/persqueue/cluster_tracker.h> #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/mind/address_classification/net_classifier.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> @@ -671,6 +672,39 @@ public: )___"); } + void CheckClustersList(TTestActorRuntime* runtime, bool waitForUpdate = true, THashMap<TString, TPQTestClusterInfo> clusters = DEFAULT_CLUSTERS_LIST) { + UNIT_ASSERT(runtime != nullptr); + + auto compareInfo = [](const TString& name, const TPQTestClusterInfo& info, const NPQ::NClusterTracker::TClustersList::TCluster& trackerInfo) { + UNIT_ASSERT_EQUAL(name, trackerInfo.Name); + UNIT_ASSERT_EQUAL(name, trackerInfo.Datacenter); + UNIT_ASSERT_EQUAL(info.Balancer, trackerInfo.Balancer); + UNIT_ASSERT_EQUAL(info.Enabled, trackerInfo.IsEnabled); + UNIT_ASSERT_EQUAL(info.Weight, trackerInfo.Weight); + }; + + TInstant now = TInstant::Now(); + + auto edgeActor = runtime->AllocateEdgeActor(); + + while (true) { + Cerr << "=== CheckClustersList\n"; + runtime->Send(new IEventHandle(NKikimr::NPQ::NClusterTracker::MakeClusterTrackerID(), edgeActor, new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe)); + auto trackerResponse = runtime->GrabEdgeEvent<NKikimr::NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); + + if (!waitForUpdate || trackerResponse->ClustersListUpdateTimestamp && trackerResponse->ClustersListUpdateTimestamp.GetRef() >= now + TDuration::Seconds(5)) { + for (auto& clusterInfo : trackerResponse->ClustersList->Clusters) { + auto it = clusters.find(clusterInfo.Name); + UNIT_ASSERT(it != clusters.end()); + compareInfo(it->first, it->second, clusterInfo); + } + Cerr << "=== CheckClustersList Ok\n"; + break; + } + Sleep(TDuration::MilliSeconds(100)); + } + } + void UpdateDcEnabled(const TString& name, bool enabled) { TStringBuilder query; query << "UPDATE `/Root/PQ/Config/V2/Cluster` SET enabled = " << (enabled ? "true" : "false") diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index 5801a006b1..7fb6e751e3 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -71,8 +71,10 @@ public: dataCenters.emplace("dc2", NKikimr::NPersQueueTests::TPQTestClusterInfo{"dc2.logbroker.yandex.net", false}); } Server.AnnoyingClient->InitDCs(dataCenters); + Server.AnnoyingClient->CheckClustersList(Server.CleverServer->GetRuntime(), true, dataCenters); } else { Server.AnnoyingClient->InitDCs(DataCenters, LocalDC); + Server.AnnoyingClient->CheckClustersList(Server.CleverServer->GetRuntime(), true, DataCenters); } Server.AnnoyingClient->InitSourceIds(); CreateTopic(GetTestTopic(), GetLocalCluster()); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp index cfe4f984ea..673d564aae 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.cpp @@ -7,8 +7,7 @@ const TVector<NKikimrServices::EServiceKikimr> TTestServer::LOGGED_SERVICES = { NKikimrServices::PQ_WRITE_PROXY, NKikimrServices::PQ_MIRRORER, NKikimrServices::PQ_METACACHE, - NKikimrServices::PERSQUEUE, - NKikimrServices::FLAT_TX_SCHEMESHARD + NKikimrServices::PERSQUEUE }; } // namespace NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h index a71da8cccc..0486f4bb64 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h @@ -50,6 +50,7 @@ public: AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort, databaseName); if (doClientInit) { AnnoyingClient->FullInit(); + AnnoyingClient->CheckClustersList(CleverServer->GetRuntime()); } } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 091bd0685c..b01b3dfde8 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5929,57 +5929,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Y_UNIT_TEST(TClusterTrackerTest) { APITestSetup setup{TEST_CASE_NAME}; setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); - const auto edgeActorID = setup.GetServer().GetRuntime()->AllocateEdgeActor(); THashMap<TString, TPQTestClusterInfo> clusters = DEFAULT_CLUSTERS_LIST; - auto compareInfo = [](const TString& name, const TPQTestClusterInfo& info, const NPQ::NClusterTracker::TClustersList::TCluster& trackerInfo) { - UNIT_ASSERT_EQUAL(name, trackerInfo.Name); - UNIT_ASSERT_EQUAL(name, trackerInfo.Datacenter); - UNIT_ASSERT_EQUAL(info.Balancer, trackerInfo.Balancer); - UNIT_ASSERT_EQUAL(info.Enabled, trackerInfo.IsEnabled); - UNIT_ASSERT_EQUAL(info.Weight, trackerInfo.Weight); - }; - - auto getClustersFromTracker = [&]() { - setup.GetServer().GetRuntime()->Send(new IEventHandle( - NPQ::NClusterTracker::MakeClusterTrackerID(), - edgeActorID, - new NPQ::NClusterTracker::TEvClusterTracker::TEvSubscribe - )); - return setup.GetServer().GetRuntime()->GrabEdgeEvent<NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate>(); - }; - - - { - auto trackerResponce = getClustersFromTracker(); - for (auto& clusterInfo : trackerResponce->ClustersList->Clusters) { - auto it = clusters.find(clusterInfo.Name); - UNIT_ASSERT(it != clusters.end()); - compareInfo(it->first, it->second, clusterInfo); - } - } + auto runtime = setup.GetServer().GetRuntime(); + setup.GetFlatMsgBusPQClient().CheckClustersList(runtime, false, clusters); UNIT_ASSERT_EQUAL(clusters.count("dc1"), 1); UNIT_ASSERT_EQUAL(clusters.count("dc2"), 1); clusters["dc1"].Weight = 666; clusters["dc2"].Balancer = "newbalancer.net"; setup.GetFlatMsgBusPQClient().InitDCs(clusters); - TInstant updateTime = TInstant::Now(); - - while (true) { - auto trackerResponce = getClustersFromTracker(); - if (trackerResponce->ClustersListUpdateTimestamp) { - if (trackerResponce->ClustersListUpdateTimestamp.GetRef() >= updateTime + TDuration::Seconds(5)) { - for (auto& clusterInfo : trackerResponce->ClustersList->Clusters) { - auto it = clusters.find(clusterInfo.Name); - UNIT_ASSERT(it != clusters.end()); - compareInfo(it->first, it->second, clusterInfo); - } - break; - } - } - Sleep(TDuration::MilliSeconds(100)); - } + setup.GetFlatMsgBusPQClient().CheckClustersList(runtime, true, clusters); } Y_UNIT_TEST(TestReadPartitionByGroupId) { diff --git a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h index 3cf0887007..3d23cfd90b 100644 --- a/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h +++ b/ydb/services/persqueue_v1/ut/persqueue_test_fixture.h @@ -87,6 +87,9 @@ static void ModifyTopicACL(NYdb::TDriver* driver, const TString& topic, const TV Server->AnnoyingClient->FullInit(); + if (!TenantModeEnabled()) + Server->AnnoyingClient->CheckClustersList(Server->CleverServer->GetRuntime()); + Server->AnnoyingClient->CreateConsumer("user"); if (TenantModeEnabled()) { Cerr << "=== Will create fst-class topics\n"; diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h index ac4645f8b7..1ce0ee0bb2 100644 --- a/ydb/services/persqueue_v1/ut/test_utils.h +++ b/ydb/services/persqueue_v1/ut/test_utils.h @@ -29,6 +29,7 @@ using namespace NKikimr::NPersQueueTests; server.EnableGRpc(NGrpc::TServerOptions().SetHost("localhost").SetPort(grpc));\ TFlatMsgBusPQClient client(settings, grpc);\ client.FullInit();\ + client.CheckClustersList(server->GetRuntime());\ client.CreateTopic("rt3.dc1--" + TString(topic), 1);\ EnableLogs(server, { NKikimrServices::PQ_WRITE_PROXY });\ TPQDataWriter writer(messageGroupId, grpc, client, server.GetRuntime());\ |