diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-08-02 16:38:42 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-08-02 16:38:42 +0300 |
commit | 47614c6616ffab4975a9ddbc5956166f9334061a (patch) | |
tree | f8b98e65eac51440cfa3f1504cdc86fab94c997d | |
parent | f8a265e9e38423af909b4a7223b90d7c0bce51dc (diff) | |
download | ydb-47614c6616ffab4975a9ddbc5956166f9334061a.tar.gz |
DescribeTopic test fix
6 files changed, 70 insertions, 42 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index d05a169ab63..9a785c7fb06 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -663,14 +663,19 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - ClosePipe(ev->Get()->TabletId, ctx); - RequestTabletIfNeeded(ev->Get()->TabletId, ctx); + auto tabletId = ev->Get()->TabletId; + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "TEvClientDestroyed " << tabletId); + + ClosePipe(tabletId, ctx); + RequestTabletIfNeeded(tabletId, ctx); } void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { auto tabletId = ev->Get()->TabletId; + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "TEvClientConnected TabletId " << tabletId); + PipesRequested.erase(tabletId); if (ev->Get()->Status != NKikimrProto::OK) { @@ -681,6 +686,8 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, if (!it.IsEnd()) { it->second.Generation = ev->Get()->Generation; it->second.NodeId = ev->Get()->ServerId.NodeId(); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "TEvClientConnected Generation " << ev->Get()->Generation << ", NodeId " << ev->Get()->ServerId.NodeId()); } } } @@ -1345,6 +1352,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr } pResponse->SetNodeId(iter->second.NodeId.GetRef()); pResponse->SetGeneration(iter->second.Generation.GetRef()); + + LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "addPartitionToResponse tabletId " << tabletId << ", partitionId " << partitionId << ", NodeId " << pResponse->GetNodeId() << ", Generation " << pResponse->GetGeneration()); return true; }; auto sendResponse = [&](bool status) { diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index d526e608452..ee50f509d99 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -16,7 +16,7 @@ protected: THashMap<TString, NKikimr::NPersQueueTests::TPQTestClusterInfo> DataCenters; TString LocalDC = "dc1"; TTestServer Server; - TLog Log = TLog("cerr"); + TLog Log = CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG); size_t TopicPartitionsCount = 1; public: @@ -26,7 +26,8 @@ public: size_t topicPartitionsCount = 1) : TestCaseName(testCaseName) , Server(false, Nothing(), logServices, logPriority) - , TopicPartitionsCount(topicPartitionsCount) { + , TopicPartitionsCount(topicPartitionsCount) + { InitOptions(); if (start) { Start(); @@ -137,6 +138,10 @@ public: return Log; } + TTestServer& GetServer() { + return Server; + } + template <class TConsumerOrProducer> void Start(const THolder<TConsumerOrProducer>& obj) { auto startFuture = obj->Start(); @@ -223,22 +228,5 @@ public: void CreateTopic(const TString& topic, const TString& cluster, size_t partitionsCount = 1) { Server.AnnoyingClient->CreateTopic(BuildFullTopicName(topic, cluster), partitionsCount); } - - void KillPqrb(const TString& topic, const TString& cluster) { - auto describeResult = Server.AnnoyingClient->Ls(TStringBuilder() << "/Root/PQ/" << BuildFullTopicName(topic, cluster)); - UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record); - Server.AnnoyingClient->KillTablet(*Server.CleverServer, describeResult->Record.GetPathDescription().GetPersQueueGroup().GetBalancerTabletID()); - } - - void KillTopicTablets(const TString& topicName) { - auto pqGroup = Server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription().GetPersQueueGroup(); - - THashSet<ui64> restartedTablets; - for (const auto& p : pqGroup.GetPartitions()) - if (restartedTablets.insert(p.GetTabletId()).second) - Server.AnnoyingClient->KillTablet(*Server.CleverServer, p.GetTabletId()); - - Server.CleverServer->GetRuntime()->DispatchEvents(); - } }; } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h index 9e3c38b2a41..1f8eebb07bd 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h @@ -47,12 +47,16 @@ public: } void StartServer(bool doClientInit = true, TMaybe<TString> databaseName = Nothing()) { + Log.SetFormatter([](ELogPriority priority, TStringBuf message) { + return TStringBuilder() << TInstant::Now() << " " << priority << ": " << message << Endl; + }); + PrepareNetDataFile(); CleverServer = MakeHolder<NKikimr::Tests::TServer>(ServerSettings); CleverServer->EnableGRpc(GrpcServerOptions); - Cerr << "TTestServer started on Port " << Port << " GrpcPort " << GrpcPort << Endl; + Log << TLOG_INFO << "TTestServer started on Port " << Port << " GrpcPort " << GrpcPort; AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort, databaseName); if (doClientInit) { @@ -108,7 +112,43 @@ public: return CleverServer->GetDriver(); } + void KillTopicPqrbTablet(const TString& topicPath) { + KillTopicTablets(topicPath, true, false); + } + + void KillTopicPqTablets(const TString& topicPath) { + KillTopicTablets(topicPath, false, true); + } + +private: + void KillTopicTablets(const TString& topicPath, bool killPqrb, bool killPq) { + auto describeResult = AnnoyingClient->Ls(topicPath); + UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record); + auto persQueueGroup = describeResult->Record.GetPathDescription().GetPersQueueGroup(); + + if (killPqrb) + { + Log << TLOG_INFO << "Kill PQRB tablet " << persQueueGroup.GetBalancerTabletID(); + AnnoyingClient->KillTablet(*CleverServer, persQueueGroup.GetBalancerTabletID()); + } + + if (killPq) + { + THashSet<ui64> restartedTablets; + for (const auto& p : persQueueGroup.GetPartitions()) + if (restartedTablets.insert(p.GetTabletId()).second) + { + Log << TLOG_INFO << "Kill PQ tablet " << p.GetTabletId(); + AnnoyingClient->KillTablet(*CleverServer, p.GetTabletId()); + } + } + + CleverServer->GetRuntime()->DispatchEvents(); + } + public: + TString TestCaseName; + TSimpleSharedPtr<TPortManager> PortManager; ui16 Port; ui16 GrpcPort; @@ -118,6 +158,8 @@ public: NGrpc::TServerOptions GrpcServerOptions; THolder<TTempFileHandle> NetDataFile; + TLog Log = CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG); + THolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient> AnnoyingClient; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index 566c70490bd..9e7c7d9ed45 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -300,8 +300,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { WaitPlannedTasks(e, n); size_t completed = e->GetExecutedCount(); - setup->KillPqrb(setup->GetTestTopic(), setup->GetLocalCluster()); - Cerr << ">>> TEST: PQRB killed" << Endl; + setup->GetServer().KillTopicPqrbTablet(setup->GetTestTopicPath()); Sleep(TDuration::MilliSeconds(100)); e->StartFuncs(tasks); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp index 0e172f64aa8..559ea706ffb 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp @@ -63,7 +63,7 @@ namespace NYdb::NTopic::NTests { if (killTablets) { - setup.KillTopicTablets(setup.GetTestTopicPath()); + setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath()); auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); @@ -162,7 +162,7 @@ namespace NYdb::NTopic::NTests { if (killTablets) { - setup.KillTopicTablets(setup.GetTestTopicPath()); + setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath()); auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); @@ -235,7 +235,7 @@ namespace NYdb::NTopic::NTests { if (killTablets) { - setup.KillTopicTablets(setup.GetTestTopicPath()); + setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath()); auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp index 66a332f89ee..376e031d9e6 100644 --- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp @@ -13,6 +13,7 @@ using namespace NKikimr::NGRpcProxy::V1; using namespace NIcNodeCache; const static TString topicName = "rt3.dc1--topic-x"; +const static TString topicPath = "/Root/PQ/" + topicName; class TDescribeTestServer { public: @@ -174,17 +175,6 @@ private: }; Y_UNIT_TEST_SUITE(TTopicApiDescribes) { - void KillTopicTablets(NPersQueue::TTestServer& server, const TString& topicName) { - auto pqGroup = server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription() - .GetPersQueueGroup(); - - THashSet<ui64> restartedTablets; - for (const auto& p : pqGroup.GetPartitions()) - if (restartedTablets.insert(p.GetTabletId()).second) - server.AnnoyingClient->KillTablet(*server.CleverServer, p.GetTabletId()); - - server.CleverServer->GetRuntime()->DispatchEvents(); - } Y_UNIT_TEST(GetLocalDescribe) { TDescribeTestServer server{}; @@ -254,11 +244,11 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) { server.DescribePartition(150, true, false); server.DescribePartition(2, false, true); server.DescribePartition(0, false, false); - KillTopicTablets(server.Server, topicName); + server.Server.KillTopicPqTablets(topicPath); bool checkRes = server.DescribePartition(1, true, false, true); if (!checkRes) { - KillTopicTablets(server.Server, topicName); + server.Server.KillTopicPqTablets(topicPath); server.DescribePartition(1, true, false); } server.DescribePartition(3, true, true); @@ -270,7 +260,7 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) { auto server = TDescribeTestServer(); Cerr << "Describe topic with stats and location\n"; server.DescribeTopic(true, true); - KillTopicTablets(server.Server, topicName); + server.Server.KillTopicPqTablets(topicPath); Cerr << "Describe topic with stats\n"; server.DescribeTopic(true, false); Cerr << "Describe topic with location\n"; @@ -285,7 +275,7 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) { auto server = TDescribeTestServer(); server.AddConsumer("my-consumer"); server.DescribeConsumer("my-consumer", true, true); - KillTopicTablets(server.Server, topicName); + server.Server.KillTopicPqTablets(topicPath); server.DescribeConsumer("my-consumer",true, false); server.DescribeConsumer("my-consumer",false, true); server.DescribeConsumer("my-consumer",false, false); |