diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-08-11 09:04:19 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-08-11 09:43:18 +0300 |
commit | 272a427545ac185dd72a5cae6050e608e19e022e (patch) | |
tree | 5b8dab06f9f61c421ebbf392b51dad14e15dfd26 | |
parent | 1d3bb5ec7d2ebb46ee79de9518f5655e3cf46056 (diff) | |
download | ydb-272a427545ac185dd72a5cae6050e608e19e022e.tar.gz |
Check NodeId in TEvClientConnected
4 files changed, 46 insertions, 27 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index d324772fbb..6c52faf1aa 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -453,11 +453,20 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { Y_VERIFY_DEBUG_S(msg->Generation, "Tablet generation should be greater than 0"); - if (ExpectedGeneration && *ExpectedGeneration != msg->Generation) + if (ExpectedGeneration) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation); - Disconnected(); - PassAway(); + if(*ExpectedGeneration != msg->Generation) + { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation); + Disconnected(); + PassAway(); + } + if (NActors::TActivationContext::ActorSystem()->NodeId != msg->ServerId.NodeId()) + { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId()); + Disconnected(); + PassAway(); + } } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index fd442f48ce..cf1d08f008 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -23,21 +23,23 @@ public: SDKTestSetup(const TString& testCaseName, bool start = true, const TVector<NKikimrServices::EServiceKikimr>& logServices = TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, + ui32 nodeCount = 2, size_t topicPartitionsCount = 1) : TestCaseName(testCaseName) , Server(false, Nothing(), logServices, logPriority) , TopicPartitionsCount(topicPartitionsCount) { - InitOptions(); + InitOptions(nodeCount); if (start) { Start(); } } - void InitOptions() { + void InitOptions(ui32 nodeCount = 2) { Log.SetFormatter([testCaseName = TestCaseName](ELogPriority priority, TStringBuf message) { return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl; }); + Server.ServerSettings.SetNodeCount(nodeCount); Server.GrpcServerOptions.SetGRpcShutdownDeadline(TDuration::Max()); // Default TTestServer value for 'MaxReadCookies' is 10. With this value the tests are flapping with two errors: // 1. 'got more than 10 unordered cookies to commit 12' diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h index 80732d1809..b43e369e22 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h @@ -17,8 +17,11 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup { TAdaptiveLock Lock; public: TPersQueueYdbSdkTestSetup(const TString& testCaseName, bool start = true, - const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, size_t topicPartitionsCount = 1) - : SDKTestSetup(testCaseName, start, logServices, logPriority, topicPartitionsCount) + const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, + NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, + ui32 nodeCount = 2, + size_t topicPartitionsCount = 1) + : SDKTestSetup(testCaseName, start, logServices, logPriority, nodeCount, topicPartitionsCount) { } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp index 322123d899..96b10a8464 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp @@ -18,6 +18,10 @@ using namespace NYdb::NPersQueue::NTests; namespace NYdb::NTopic::NTests { Y_UNIT_TEST_SUITE(LocalPartition) { + std::shared_ptr<TPersQueueYdbSdkTestSetup> CreateSetup(TString testCaseName, ui32 nodeCount = 1) { + return std::make_shared<TPersQueueYdbSdkTestSetup>(testCaseName, true, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, nodeCount, 1); + } + NYdb::TDriverConfig CreateConfig(TString discoveryAddr) { return NYdb::TDriverConfig() @@ -138,9 +142,9 @@ namespace NYdb::NTopic::NTests { UNIT_ASSERT(context); - if(Delay) + if (Delay) { - Cerr << "==== Delay " << Delay << " before ListEndpoints request" << Endl; + Cerr << "==== Delay " << Delay << " before ListEndpoints request" << Endl; TInstant start = TInstant::Now(); while (start + Delay < TInstant::Now()) { @@ -186,7 +190,7 @@ namespace NYdb::NTopic::NTests { std::shared_ptr<TMockDiscoveryService> MockDiscoveryService; }; - auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(testCaseName); + auto setup = CreateSetup(testCaseName); if (!mockDiscoveryService) { @@ -220,15 +224,16 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(DescribeBadPartition) { - TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + auto setup = CreateSetup(TEST_CASE_NAME); + TMockDiscoveryService discovery; - discovery.SetGoodEndpoints(setup); + discovery.SetGoodEndpoints(*setup); auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(); // Set non-existing partition - auto writeSettings = CreateWriteSessionSettings(setup); + auto writeSettings = CreateWriteSessionSettings(*setup); writeSettings.RetryPolicy(retryPolicy); writeSettings.PartitionId(1); @@ -245,7 +250,7 @@ namespace NYdb::NTopic::NTests { Cerr << "=== Alter partition count\n"; TAlterTopicSettings alterSettings; alterSettings.AlterPartitioningSettings(2, 2); - auto alterResult = client.AlterTopic(setup.GetTestTopicPath(), alterSettings).GetValueSync(); + auto alterResult = client.AlterTopic(setup->GetTestTopicPath(), alterSettings).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); Cerr << "=== Wait for repair\n"; @@ -256,14 +261,14 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(DiscoveryServiceBadPort) { - TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + auto setup = CreateSetup(TEST_CASE_NAME); TMockDiscoveryService discovery; discovery.SetEndpoints(9999, 2, 0); auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(); - auto writeSettings = CreateWriteSessionSettings(setup); + auto writeSettings = CreateWriteSessionSettings(*setup); writeSettings.RetryPolicy(retryPolicy); retryPolicy->Initialize(); @@ -276,7 +281,7 @@ namespace NYdb::NTopic::NTests { Cerr << "=== Wait for retries\n"; retryPolicy->WaitForRetriesSync(3); - discovery.SetGoodEndpoints(setup); + discovery.SetGoodEndpoints(*setup); Cerr << "=== Wait for repair\n"; retryPolicy->WaitForRepairSync(); @@ -286,14 +291,14 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(DiscoveryServiceBadNodeId) { - TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + auto setup = CreateSetup(TEST_CASE_NAME); TMockDiscoveryService discovery; - discovery.SetEndpoints(9999, setup.GetRuntime().GetNodeCount(), setup.GetGrpcPort()); + discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetGrpcPort()); auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(); - auto writeSettings = CreateWriteSessionSettings(setup); + auto writeSettings = CreateWriteSessionSettings(*setup); writeSettings.RetryPolicy(retryPolicy); retryPolicy->Initialize(); @@ -306,7 +311,7 @@ namespace NYdb::NTopic::NTests { Cerr << "=== Wait for retries\n"; retryPolicy->WaitForRetriesSync(3); - discovery.SetGoodEndpoints(setup); + discovery.SetGoodEndpoints(*setup); Cerr << "=== Wait for repair\n"; retryPolicy->WaitForRepairSync(); @@ -316,14 +321,14 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(DescribeHang) { - TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + auto setup = CreateSetup(TEST_CASE_NAME); TMockDiscoveryService discovery; discovery.SetEndpoints(9999, 2, 0); auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(TDuration::Days(1)); - auto writeSettings = CreateWriteSessionSettings(setup); + auto writeSettings = CreateWriteSessionSettings(*setup); writeSettings.RetryPolicy(retryPolicy); retryPolicy->Initialize(); @@ -338,15 +343,15 @@ namespace NYdb::NTopic::NTests { } Y_UNIT_TEST(DiscoveryHang) { - TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME); + auto setup = CreateSetup(TEST_CASE_NAME); TMockDiscoveryService discovery; - discovery.SetGoodEndpoints(setup); + discovery.SetGoodEndpoints(*setup); discovery.SetDelay(TDuration::Days(1)); Cerr << "=== Create write session\n"; TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr()))); - auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings(setup)); + auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings(*setup)); Cerr << "=== Close write session\n"; writeSession->Close(); |