aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-08-02 16:38:42 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-08-02 16:38:42 +0300
commit47614c6616ffab4975a9ddbc5956166f9334061a (patch)
treef8b98e65eac51440cfa3f1504cdc86fab94c997d
parentf8a265e9e38423af909b4a7223b90d7c0bce51dc (diff)
downloadydb-47614c6616ffab4975a9ddbc5956166f9334061a.tar.gz
DescribeTopic test fix
-rw-r--r--ydb/core/persqueue/read_balancer.cpp13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h44
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp6
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp20
6 files changed, 70 insertions, 42 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index d05a169ab63..9a785c7fb06 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -663,14 +663,19 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr&
void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
{
- ClosePipe(ev->Get()->TabletId, ctx);
- RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
+ auto tabletId = ev->Get()->TabletId;
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "TEvClientDestroyed " << tabletId);
+
+ ClosePipe(tabletId, ctx);
+ RequestTabletIfNeeded(tabletId, ctx);
}
void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx)
{
auto tabletId = ev->Get()->TabletId;
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "TEvClientConnected TabletId " << tabletId);
+
PipesRequested.erase(tabletId);
if (ev->Get()->Status != NKikimrProto::OK) {
@@ -681,6 +686,8 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev,
if (!it.IsEnd()) {
it->second.Generation = ev->Get()->Generation;
it->second.NodeId = ev->Get()->ServerId.NodeId();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "TEvClientConnected Generation " << ev->Get()->Generation << ", NodeId " << ev->Get()->ServerId.NodeId());
}
}
}
@@ -1345,6 +1352,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr
}
pResponse->SetNodeId(iter->second.NodeId.GetRef());
pResponse->SetGeneration(iter->second.Generation.GetRef());
+
+ LOG_CRIT_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, GetPrefix() << "addPartitionToResponse tabletId " << tabletId << ", partitionId " << partitionId << ", NodeId " << pResponse->GetNodeId() << ", Generation " << pResponse->GetGeneration());
return true;
};
auto sendResponse = [&](bool status) {
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index d526e608452..ee50f509d99 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -16,7 +16,7 @@ protected:
THashMap<TString, NKikimr::NPersQueueTests::TPQTestClusterInfo> DataCenters;
TString LocalDC = "dc1";
TTestServer Server;
- TLog Log = TLog("cerr");
+ TLog Log = CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG);
size_t TopicPartitionsCount = 1;
public:
@@ -26,7 +26,8 @@ public:
size_t topicPartitionsCount = 1)
: TestCaseName(testCaseName)
, Server(false, Nothing(), logServices, logPriority)
- , TopicPartitionsCount(topicPartitionsCount) {
+ , TopicPartitionsCount(topicPartitionsCount)
+ {
InitOptions();
if (start) {
Start();
@@ -137,6 +138,10 @@ public:
return Log;
}
+ TTestServer& GetServer() {
+ return Server;
+ }
+
template <class TConsumerOrProducer>
void Start(const THolder<TConsumerOrProducer>& obj) {
auto startFuture = obj->Start();
@@ -223,22 +228,5 @@ public:
void CreateTopic(const TString& topic, const TString& cluster, size_t partitionsCount = 1) {
Server.AnnoyingClient->CreateTopic(BuildFullTopicName(topic, cluster), partitionsCount);
}
-
- void KillPqrb(const TString& topic, const TString& cluster) {
- auto describeResult = Server.AnnoyingClient->Ls(TStringBuilder() << "/Root/PQ/" << BuildFullTopicName(topic, cluster));
- UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record);
- Server.AnnoyingClient->KillTablet(*Server.CleverServer, describeResult->Record.GetPathDescription().GetPersQueueGroup().GetBalancerTabletID());
- }
-
- void KillTopicTablets(const TString& topicName) {
- auto pqGroup = Server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription().GetPersQueueGroup();
-
- THashSet<ui64> restartedTablets;
- for (const auto& p : pqGroup.GetPartitions())
- if (restartedTablets.insert(p.GetTabletId()).second)
- Server.AnnoyingClient->KillTablet(*Server.CleverServer, p.GetTabletId());
-
- Server.CleverServer->GetRuntime()->DispatchEvents();
- }
};
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
index 9e3c38b2a41..1f8eebb07bd 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h
@@ -47,12 +47,16 @@ public:
}
void StartServer(bool doClientInit = true, TMaybe<TString> databaseName = Nothing()) {
+ Log.SetFormatter([](ELogPriority priority, TStringBuf message) {
+ return TStringBuilder() << TInstant::Now() << " " << priority << ": " << message << Endl;
+ });
+
PrepareNetDataFile();
CleverServer = MakeHolder<NKikimr::Tests::TServer>(ServerSettings);
CleverServer->EnableGRpc(GrpcServerOptions);
- Cerr << "TTestServer started on Port " << Port << " GrpcPort " << GrpcPort << Endl;
+ Log << TLOG_INFO << "TTestServer started on Port " << Port << " GrpcPort " << GrpcPort;
AnnoyingClient = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(ServerSettings, GrpcPort, databaseName);
if (doClientInit) {
@@ -108,7 +112,43 @@ public:
return CleverServer->GetDriver();
}
+ void KillTopicPqrbTablet(const TString& topicPath) {
+ KillTopicTablets(topicPath, true, false);
+ }
+
+ void KillTopicPqTablets(const TString& topicPath) {
+ KillTopicTablets(topicPath, false, true);
+ }
+
+private:
+ void KillTopicTablets(const TString& topicPath, bool killPqrb, bool killPq) {
+ auto describeResult = AnnoyingClient->Ls(topicPath);
+ UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record);
+ auto persQueueGroup = describeResult->Record.GetPathDescription().GetPersQueueGroup();
+
+ if (killPqrb)
+ {
+ Log << TLOG_INFO << "Kill PQRB tablet " << persQueueGroup.GetBalancerTabletID();
+ AnnoyingClient->KillTablet(*CleverServer, persQueueGroup.GetBalancerTabletID());
+ }
+
+ if (killPq)
+ {
+ THashSet<ui64> restartedTablets;
+ for (const auto& p : persQueueGroup.GetPartitions())
+ if (restartedTablets.insert(p.GetTabletId()).second)
+ {
+ Log << TLOG_INFO << "Kill PQ tablet " << p.GetTabletId();
+ AnnoyingClient->KillTablet(*CleverServer, p.GetTabletId());
+ }
+ }
+
+ CleverServer->GetRuntime()->DispatchEvents();
+ }
+
public:
+ TString TestCaseName;
+
TSimpleSharedPtr<TPortManager> PortManager;
ui16 Port;
ui16 GrpcPort;
@@ -118,6 +158,8 @@ public:
NGrpc::TServerOptions GrpcServerOptions;
THolder<TTempFileHandle> NetDataFile;
+ TLog Log = CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG);
+
THolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient> AnnoyingClient;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index 566c70490bd..9e7c7d9ed45 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -300,8 +300,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
WaitPlannedTasks(e, n);
size_t completed = e->GetExecutedCount();
- setup->KillPqrb(setup->GetTestTopic(), setup->GetLocalCluster());
- Cerr << ">>> TEST: PQRB killed" << Endl;
+ setup->GetServer().KillTopicPqrbTablet(setup->GetTestTopicPath());
Sleep(TDuration::MilliSeconds(100));
e->StartFuncs(tasks);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
index 0e172f64aa8..559ea706ffb 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/describe_topic_ut.cpp
@@ -63,7 +63,7 @@ namespace NYdb::NTopic::NTests {
if (killTablets)
{
- setup.KillTopicTablets(setup.GetTestTopicPath());
+ setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath());
auto result = client.DescribeTopic(setup.GetTestTopicPath(), settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -162,7 +162,7 @@ namespace NYdb::NTopic::NTests {
if (killTablets)
{
- setup.KillTopicTablets(setup.GetTestTopicPath());
+ setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath());
auto result = client.DescribeConsumer(setup.GetTestTopicPath(), ::NPersQueue::SDKTestSetup::GetTestConsumer(), settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -235,7 +235,7 @@ namespace NYdb::NTopic::NTests {
if (killTablets)
{
- setup.KillTopicTablets(setup.GetTestTopicPath());
+ setup.GetServer().KillTopicPqTablets(setup.GetTestTopicPath());
auto result = client.DescribePartition(setup.GetTestTopicPath(), testPartitionId, settings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
index 66a332f89ee..376e031d9e6 100644
--- a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
@@ -13,6 +13,7 @@ using namespace NKikimr::NGRpcProxy::V1;
using namespace NIcNodeCache;
const static TString topicName = "rt3.dc1--topic-x";
+const static TString topicPath = "/Root/PQ/" + topicName;
class TDescribeTestServer {
public:
@@ -174,17 +175,6 @@ private:
};
Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
- void KillTopicTablets(NPersQueue::TTestServer& server, const TString& topicName) {
- auto pqGroup = server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription()
- .GetPersQueueGroup();
-
- THashSet<ui64> restartedTablets;
- for (const auto& p : pqGroup.GetPartitions())
- if (restartedTablets.insert(p.GetTabletId()).second)
- server.AnnoyingClient->KillTablet(*server.CleverServer, p.GetTabletId());
-
- server.CleverServer->GetRuntime()->DispatchEvents();
- }
Y_UNIT_TEST(GetLocalDescribe) {
TDescribeTestServer server{};
@@ -254,11 +244,11 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
server.DescribePartition(150, true, false);
server.DescribePartition(2, false, true);
server.DescribePartition(0, false, false);
- KillTopicTablets(server.Server, topicName);
+ server.Server.KillTopicPqTablets(topicPath);
bool checkRes = server.DescribePartition(1, true, false, true);
if (!checkRes) {
- KillTopicTablets(server.Server, topicName);
+ server.Server.KillTopicPqTablets(topicPath);
server.DescribePartition(1, true, false);
}
server.DescribePartition(3, true, true);
@@ -270,7 +260,7 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
auto server = TDescribeTestServer();
Cerr << "Describe topic with stats and location\n";
server.DescribeTopic(true, true);
- KillTopicTablets(server.Server, topicName);
+ server.Server.KillTopicPqTablets(topicPath);
Cerr << "Describe topic with stats\n";
server.DescribeTopic(true, false);
Cerr << "Describe topic with location\n";
@@ -285,7 +275,7 @@ Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
auto server = TDescribeTestServer();
server.AddConsumer("my-consumer");
server.DescribeConsumer("my-consumer", true, true);
- KillTopicTablets(server.Server, topicName);
+ server.Server.KillTopicPqTablets(topicPath);
server.DescribeConsumer("my-consumer",true, false);
server.DescribeConsumer("my-consumer",false, true);
server.DescribeConsumer("my-consumer",false, false);