aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-08-11 09:04:19 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-08-11 09:43:18 +0300
commit272a427545ac185dd72a5cae6050e608e19e022e (patch)
tree5b8dab06f9f61c421ebbf392b51dad14e15dfd26
parent1d3bb5ec7d2ebb46ee79de9518f5655e3cf46056 (diff)
downloadydb-272a427545ac185dd72a5cae6050e608e19e022e.tar.gz
Check NodeId in TEvClientConnected
-rw-r--r--ydb/core/persqueue/writer/writer.cpp17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp43
4 files changed, 46 insertions, 27 deletions
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index d324772fbb..6c52faf1aa 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -453,11 +453,20 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
Y_VERIFY_DEBUG_S(msg->Generation, "Tablet generation should be greater than 0");
- if (ExpectedGeneration && *ExpectedGeneration != msg->Generation)
+ if (ExpectedGeneration)
{
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation);
- Disconnected();
- PassAway();
+ if(*ExpectedGeneration != msg->Generation)
+ {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation);
+ Disconnected();
+ PassAway();
+ }
+ if (NActors::TActivationContext::ActorSystem()->NodeId != msg->ServerId.NodeId())
+ {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId());
+ Disconnected();
+ PassAway();
+ }
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index fd442f48ce..cf1d08f008 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -23,21 +23,23 @@ public:
SDKTestSetup(const TString& testCaseName, bool start = true,
const TVector<NKikimrServices::EServiceKikimr>& logServices = TTestServer::LOGGED_SERVICES,
NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG,
+ ui32 nodeCount = 2,
size_t topicPartitionsCount = 1)
: TestCaseName(testCaseName)
, Server(false, Nothing(), logServices, logPriority)
, TopicPartitionsCount(topicPartitionsCount)
{
- InitOptions();
+ InitOptions(nodeCount);
if (start) {
Start();
}
}
- void InitOptions() {
+ void InitOptions(ui32 nodeCount = 2) {
Log.SetFormatter([testCaseName = TestCaseName](ELogPriority priority, TStringBuf message) {
return TStringBuilder() << TInstant::Now() << " :" << testCaseName << " " << priority << ": " << message << Endl;
});
+ Server.ServerSettings.SetNodeCount(nodeCount);
Server.GrpcServerOptions.SetGRpcShutdownDeadline(TDuration::Max());
// Default TTestServer value for 'MaxReadCookies' is 10. With this value the tests are flapping with two errors:
// 1. 'got more than 10 unordered cookies to commit 12'
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
index 80732d1809..b43e369e22 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
@@ -17,8 +17,11 @@ class TPersQueueYdbSdkTestSetup : public ::NPersQueue::SDKTestSetup {
TAdaptiveLock Lock;
public:
TPersQueueYdbSdkTestSetup(const TString& testCaseName, bool start = true,
- const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG, size_t topicPartitionsCount = 1)
- : SDKTestSetup(testCaseName, start, logServices, logPriority, topicPartitionsCount)
+ const TVector<NKikimrServices::EServiceKikimr>& logServices = ::NPersQueue::TTestServer::LOGGED_SERVICES,
+ NActors::NLog::EPriority logPriority = NActors::NLog::PRI_DEBUG,
+ ui32 nodeCount = 2,
+ size_t topicPartitionsCount = 1)
+ : SDKTestSetup(testCaseName, start, logServices, logPriority, nodeCount, topicPartitionsCount)
{
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
index 322123d899..96b10a8464 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/local_partition_ut.cpp
@@ -18,6 +18,10 @@ using namespace NYdb::NPersQueue::NTests;
namespace NYdb::NTopic::NTests {
Y_UNIT_TEST_SUITE(LocalPartition) {
+ std::shared_ptr<TPersQueueYdbSdkTestSetup> CreateSetup(TString testCaseName, ui32 nodeCount = 1) {
+ return std::make_shared<TPersQueueYdbSdkTestSetup>(testCaseName, true, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, nodeCount, 1);
+ }
+
NYdb::TDriverConfig CreateConfig(TString discoveryAddr)
{
return NYdb::TDriverConfig()
@@ -138,9 +142,9 @@ namespace NYdb::NTopic::NTests {
UNIT_ASSERT(context);
- if(Delay)
+ if (Delay)
{
- Cerr << "==== Delay " << Delay << " before ListEndpoints request" << Endl;
+ Cerr << "==== Delay " << Delay << " before ListEndpoints request" << Endl;
TInstant start = TInstant::Now();
while (start + Delay < TInstant::Now())
{
@@ -186,7 +190,7 @@ namespace NYdb::NTopic::NTests {
std::shared_ptr<TMockDiscoveryService> MockDiscoveryService;
};
- auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(testCaseName);
+ auto setup = CreateSetup(testCaseName);
if (!mockDiscoveryService)
{
@@ -220,15 +224,16 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(DescribeBadPartition) {
- TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ auto setup = CreateSetup(TEST_CASE_NAME);
+
TMockDiscoveryService discovery;
- discovery.SetGoodEndpoints(setup);
+ discovery.SetGoodEndpoints(*setup);
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
// Set non-existing partition
- auto writeSettings = CreateWriteSessionSettings(setup);
+ auto writeSettings = CreateWriteSessionSettings(*setup);
writeSettings.RetryPolicy(retryPolicy);
writeSettings.PartitionId(1);
@@ -245,7 +250,7 @@ namespace NYdb::NTopic::NTests {
Cerr << "=== Alter partition count\n";
TAlterTopicSettings alterSettings;
alterSettings.AlterPartitioningSettings(2, 2);
- auto alterResult = client.AlterTopic(setup.GetTestTopicPath(), alterSettings).GetValueSync();
+ auto alterResult = client.AlterTopic(setup->GetTestTopicPath(), alterSettings).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Cerr << "=== Wait for repair\n";
@@ -256,14 +261,14 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(DiscoveryServiceBadPort) {
- TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ auto setup = CreateSetup(TEST_CASE_NAME);
TMockDiscoveryService discovery;
discovery.SetEndpoints(9999, 2, 0);
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
- auto writeSettings = CreateWriteSessionSettings(setup);
+ auto writeSettings = CreateWriteSessionSettings(*setup);
writeSettings.RetryPolicy(retryPolicy);
retryPolicy->Initialize();
@@ -276,7 +281,7 @@ namespace NYdb::NTopic::NTests {
Cerr << "=== Wait for retries\n";
retryPolicy->WaitForRetriesSync(3);
- discovery.SetGoodEndpoints(setup);
+ discovery.SetGoodEndpoints(*setup);
Cerr << "=== Wait for repair\n";
retryPolicy->WaitForRepairSync();
@@ -286,14 +291,14 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(DiscoveryServiceBadNodeId) {
- TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ auto setup = CreateSetup(TEST_CASE_NAME);
TMockDiscoveryService discovery;
- discovery.SetEndpoints(9999, setup.GetRuntime().GetNodeCount(), setup.GetGrpcPort());
+ discovery.SetEndpoints(9999, setup->GetRuntime().GetNodeCount(), setup->GetGrpcPort());
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
- auto writeSettings = CreateWriteSessionSettings(setup);
+ auto writeSettings = CreateWriteSessionSettings(*setup);
writeSettings.RetryPolicy(retryPolicy);
retryPolicy->Initialize();
@@ -306,7 +311,7 @@ namespace NYdb::NTopic::NTests {
Cerr << "=== Wait for retries\n";
retryPolicy->WaitForRetriesSync(3);
- discovery.SetGoodEndpoints(setup);
+ discovery.SetGoodEndpoints(*setup);
Cerr << "=== Wait for repair\n";
retryPolicy->WaitForRepairSync();
@@ -316,14 +321,14 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(DescribeHang) {
- TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ auto setup = CreateSetup(TEST_CASE_NAME);
TMockDiscoveryService discovery;
discovery.SetEndpoints(9999, 2, 0);
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>(TDuration::Days(1));
- auto writeSettings = CreateWriteSessionSettings(setup);
+ auto writeSettings = CreateWriteSessionSettings(*setup);
writeSettings.RetryPolicy(retryPolicy);
retryPolicy->Initialize();
@@ -338,15 +343,15 @@ namespace NYdb::NTopic::NTests {
}
Y_UNIT_TEST(DiscoveryHang) {
- TPersQueueYdbSdkTestSetup setup(TEST_CASE_NAME);
+ auto setup = CreateSetup(TEST_CASE_NAME);
TMockDiscoveryService discovery;
- discovery.SetGoodEndpoints(setup);
+ discovery.SetGoodEndpoints(*setup);
discovery.SetDelay(TDuration::Days(1));
Cerr << "=== Create write session\n";
TTopicClient client(TDriver(CreateConfig(discovery.GetDiscoveryAddr())));
- auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings(setup));
+ auto writeSession = client.CreateWriteSession(CreateWriteSessionSettings(*setup));
Cerr << "=== Close write session\n";
writeSession->Close();