diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-06-08 10:26:27 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-06-08 10:26:27 +0300 |
commit | b62949a9443463174da1e380a0403934d8030cbe (patch) | |
tree | 63f054fb0420a8b4783575f6e09e1f6dab35d7db | |
parent | d7d8fafed9482845dd3de0eae0bfaaf95fa4ef7a (diff) | |
download | ydb-b62949a9443463174da1e380a0403934d8030cbe.tar.gz |
Flaky test RetryPolicy::TWriteSession_SeqNoShift
4 files changed, 122 insertions, 73 deletions
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 4ff768299f9..dfa3d00d500 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -731,16 +731,18 @@ public: runtime->DispatchEvents(options); } - ui32 TopicCreated(const TString& name, ui64 cacheSize = 0) { + ui32 GetTopicVersionFromMetadata(const TString& name, ui64 cacheSize = 0) { TAutoPtr<NMsgBusProxy::TBusPersQueue> request(new NMsgBusProxy::TBusPersQueue); auto req = request->Record.MutableMetaRequest()->MutableCmdGetTopicMetadata(); req->AddTopic(name); - + + Cerr << "GetTopicVersionFromMetadata request: " << name << Endl; TAutoPtr<NBus::TBusMessage> reply; NBus::EMessageStatus status = SyncCall(request, reply); - Cerr << "Topic created - response: " << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cerr << "GetTopicVersionFromMetadata response: " << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; if (status != NBus::MESSAGE_OK) return 0; + UNIT_ASSERT_VALUES_EQUAL(status, NBus::MESSAGE_OK); const NMsgBusProxy::TBusResponse* response = dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Get()); UNIT_ASSERT(response); @@ -766,13 +768,13 @@ public: return topicInfo.GetConfig().GetVersion(); } - ui32 TopicRealCreated(const TString& name) { + ui32 GetTopicVersionFromPath(const TString& name) { TAutoPtr<NMsgBusProxy::TBusResponse> res = Ls("/Root/PQ/" + name); - Cerr << res->Record << "\n"; - return res->Record.GetPathDescription().GetPersQueueGroup().GetAlterVersion(); + ui32 version = res->Record.GetPathDescription().GetPersQueueGroup().GetAlterVersion(); + Cerr << "GetTopicVersionFromPath: " << " record " << res->Record << " name " << name << " version" << version << "\n"; + return version; } - void RestartBalancerTablet(TTestActorRuntime* runtime, const TString& topic) { TAutoPtr<NMsgBusProxy::TBusResponse> res = Ls("/Root/PQ/" + topic); Cerr << res->Record << "\n"; @@ -855,11 +857,12 @@ public: ModifyACL(pref, name.substr(pos + 1), acl.SerializeAsString()); } - void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, const TMaybe<TString>& dc = Nothing(), TVector<TString> rr = {"user"}, const TMaybe<TString>& account = Nothing(), bool expectFail = false ) { + Cerr << "CreateTopicNoLegacy: " << name << Endl; + TString path = name; if (UseConfigTables && !path.StartsWith("/Root") && !account.Defined()) { path = TStringBuilder() << "/Root/PQ/" << name; @@ -906,15 +909,13 @@ public: } while (true); } - void CreateTopic( - const TRequestCreatePQ& createRequest, - bool doWait = true + void CreateTopic(const TRequestCreatePQ& createRequest, bool doWait = true ) { const TInstant start = TInstant::Now(); THolder<NMsgBusProxy::TBusPersQueue> request = createRequest.GetRequest(); - ui32 prevVersion = TopicCreated(createRequest.Topic); + ui32 prevVersion = GetTopicVersionFromMetadata(createRequest.Topic); TAutoPtr<NBus::TBusMessage> reply; const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply); UNIT_ASSERT(response); @@ -922,11 +923,11 @@ public: TStringBuilder() << "proxy failure: " << response->Record.DebugString()); AddTopic(createRequest.Topic); - while (doWait && TopicRealCreated(createRequest.Topic) != prevVersion + 1) { + while (doWait && GetTopicVersionFromPath(createRequest.Topic) != prevVersion + 1) { Sleep(TDuration::MilliSeconds(500)); UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT); } - while (doWait && TopicCreated(createRequest.Topic, prevVersion) != prevVersion + 1) { + while (doWait && GetTopicVersionFromMetadata(createRequest.Topic, prevVersion) != prevVersion + 1) { Sleep(TDuration::MilliSeconds(500)); UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT); } @@ -984,7 +985,7 @@ public: TRequestAlterPQ requestDescr(name, nParts, cacheSize, lifetimeS, fillPartitionConfig, mirrorFrom); THolder<NMsgBusProxy::TBusPersQueue> request = requestDescr.GetRequest(); - ui32 prevVersion = TopicCreated(name); + ui32 prevVersion = GetTopicVersionFromMetadata(name); TAutoPtr<NBus::TBusMessage> reply; const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); @@ -994,11 +995,11 @@ public: const TInstant start = TInstant::Now(); AlterTopic(); - while (TopicCreated(name, cacheSize) != prevVersion + 1) { + while (GetTopicVersionFromMetadata(name, cacheSize) != prevVersion + 1) { Sleep(TDuration::MilliSeconds(500)); UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT); } - while (TopicRealCreated(name) != prevVersion + 1) { + while (GetTopicVersionFromPath(name) != prevVersion + 1) { Sleep(TDuration::MilliSeconds(500)); UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT); } @@ -1394,10 +1395,12 @@ private: public: void AddTopic(const TString& topic, const TMaybe<TString>& dc = Nothing()) { + Cerr << "AddTopic: " << topic << Endl; return AddOrRemoveTopic(topic, true, dc); } void RemoveTopic(const TString& topic) { + Cerr << "RemoveTopic: " << topic << Endl; return AddOrRemoveTopic(topic, false); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp index 73cb9d4df72..e23f5ddaa5f 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp @@ -15,7 +15,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { Y_UNIT_TEST(TWriteSession_TestPolicy) { TYdbPqWriterTestHelper helper(TEST_CASE_NAME); helper.Write(true); - helper.Policy->Initialized(); // Thus ignoring possible early retries on "cluster initializing" + helper.Policy->Initialize(); // Thus ignoring possible early retries on "cluster initializing" auto doBreakDown = [&] () { helper.Policy->ExpectBreakDown(); NThreading::TPromise<void> retriesPromise = NThreading::NewPromise(); @@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { Y_UNIT_TEST(TWriteSession_TestBrokenPolicy) { TYdbPqWriterTestHelper helper(TEST_CASE_NAME); helper.Write(); - helper.Policy->Initialized(); + helper.Policy->Initialize(); helper.Policy->ExpectFatalBreakDown(); helper.EventLoop->AllowStop(); auto f1 = helper.Write(false); @@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { settings.AllowFallbackToOtherClusters(false); settings.RetryPolicy(retryPolicy); - retryPolicy->Initialized(); + retryPolicy->Initialize(); retryPolicy->ExpectBreakDown(); auto& client = setup1->GetPersQueueClient(); @@ -116,7 +116,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1); helper->Write(true); auto retryPolicy = helper->Policy; - retryPolicy->Initialized(); + retryPolicy->Initialize(); auto waitForReconnect = [&](bool enable) { Cerr << "=== Expect breakdown\n"; @@ -158,77 +158,106 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { setup2.Start(); setup1->AddDataCenter("dc2", setup2, true); setup1->Start(); - TString tmpSrcId = "tmp-src-seqno-shift"; - auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, TString(), true); - auto helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId, true); + TString sourceId1 = SDKTestSetup::GetTestMessageGroupId() + "1"; + TString sourceId2 = SDKTestSetup::GetTestMessageGroupId() + "2"; + auto writer1 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, sourceId1 , true); + auto writer2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, sourceId2, true); auto settings = setup1->GetWriteSessionSettings(); auto& client = setup1->GetPersQueueClient(); + //! Fill data in dc1 1 with SeqNo = 1..10 for 2 different SrcId + Cerr << "===Write 10 messages into every writer\n"; for (auto i = 0; i != 10; i++) { - helper->Write(true); // 1 - helper2->Write(true); // 1 + writer1->Write(true); // 1 + writer2->Write(true); // 1 } + Cerr << "===Messages were written\n"; + Cerr << "===Disable dc1\n"; //! Leave only dc2 available + writer1->Policy->ExpectBreakDown(); + writer2->Policy->ExpectBreakDown(); setup1->DisableDataCenter("dc1"); - helper->Policy->ExpectBreakDown(); - helper->Policy->WaitForRetriesSync(1); + writer1->Policy->WaitForRetriesSync(1); + writer2->Policy->WaitForRetriesSync(1); + + Cerr << "===Recreate writers\n"; - //! Re-create helpers, kill previous sessions. New sessions will connect to dc2. - helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, TString(), true); - helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, tmpSrcId, true); + //! Re-create writers, kill previous sessions. New sessions will connect to dc2. + writer1 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, sourceId1, true); + writer2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, sourceId2, true); //! Write some data and await confirmation - just to ensure sessions are started. - helper->Write(true); - helper2->Write(true); + Cerr << "===Write one message into every writer\n"; + writer1->Write(true); + writer2->Write(true); + Cerr << "===Messages were written\n"; - helper->Policy->ExpectBreakDown(); - Cerr << "Disable dc2\n"; //! Leave no available DCs + writer1->Policy->ExpectBreakDown(); + writer2->Policy->ExpectBreakDown(); + writer1->Policy->Initialize(); + writer2->Policy->Initialize(); + + Cerr << "===Disable dc2\n"; setup1->DisableDataCenter("dc2"); - Cerr << "Wait for retries after initial dc2 shutdown\n"; - helper->Policy->WaitForRetriesSync(1); + Cerr << "===Wait for retries after initial dc2 shutdown\n"; + writer1->Policy->WaitForRetriesSync(1); + writer2->Policy->WaitForRetriesSync(1); //! Put some data inflight. It cannot be written now, but SeqNo will be assigned. + Cerr << "===Write four async message into every writer\n"; for (auto i = 0; i != 4; i++) { - helper->Write(false); - helper2->Write(false); + writer1->Write(false); + writer2->Write(false); } - auto f = helper->Write(false); - auto f2 = helper2->Write(false); + auto f1 = writer1->Write(false); + auto f2 = writer2->Write(false); + //! Enable DC1. Now writers gonna write collected data to DC1 having LastSeqNo = 10 //! (because of data written in the very beginning), and inflight data has SeqNo assigned = 2..5, //! so the SeqNo shift takes place. + Cerr << "===Enable dc2\n"; setup1->EnableDataCenter("dc1"); - Cerr << "Wait for writes to complete\n"; - - f.Wait(); + Cerr << "===Wait for writes to complete\n"; + f1.Wait(); f2.Wait(); + Cerr << "===Messages were written\n"; + //! Writer1 is not used any more. - helper->EventLoop->AllowStop(); - helper = nullptr; + writer1->EventLoop->AllowStop(); + writer1 = nullptr; + + Cerr << "===Writer 1 closed\n"; + writer2->Policy->ExpectBreakDown(); + writer2->Policy->Initialize(); - helper2->Policy->Initialized(); - helper2->Policy->ExpectBreakDown(); //! For the second writer, do switchback to dc2. + Cerr << "===Disable dc1\n"; setup1->DisableDataCenter("dc1"); - helper2->Policy->WaitForRetriesSync(1); + Cerr << "===Wait for retries after dc1 shutdown\n"; + writer2->Policy->WaitForRetriesSync(1); + //! Put some data inflight again; + Cerr << "===Write four async messages into writer2\n"; for (auto i = 0; i != 4; i++) { - helper2->Write(false); + writer2->Write(false); } - f = helper2->Write(false); - Cerr << "Enable dc2\n"; + f2 = writer2->Write(false); + + Cerr << "===Enable dc2\n"; setup1->EnableDataCenter("dc2"); - f.Wait(); + f2.Wait(); + Cerr << "===Messages were written\n"; - helper2->EventLoop->AllowStop(); - helper2->Policy->ExpectBreakDown(); + writer2->EventLoop->AllowStop(); + writer2->Policy->ExpectBreakDown(); + writer2 = nullptr; - Cerr << "Enable dc1\n"; + Cerr << "===Enable dc1\n"; setup1->EnableDataCenter("dc1"); auto CheckSeqNo = [&] (const TString& dcName, ui64 expectedSeqNo) { settings.PreferredCluster(dcName); @@ -242,19 +271,19 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { //!check SeqNo in both DC. For writer1 We expect 14 messages in DC1 //! (10 written initially + 4 written after reconnect) and 1 message in DC2 (only initial message). - Cerr << "Check SeqNo writer1, dc2\n"; + settings.MessageGroupId(sourceId1); + Cerr << "===Check SeqNo writer1, dc2\n"; CheckSeqNo("dc2", 1); - Cerr << "Check SeqNo writer1, dc1\n"; + Cerr << "===Check SeqNo writer1, dc1\n"; CheckSeqNo("dc1", 14); //! Check SeqNo for writer 2; Expect to have 6 messages on DC2 with MaxSeqNo = 6; - helper2 = nullptr; - settings.MessageGroupId(tmpSrcId); - Cerr << "Check SeqNo writer2 dc2\n"; - //!DC2 has no shift in SeqNo since 5 messages were written to dc 1. - CheckSeqNo("dc2", 15); - Cerr << "Check SeqNo writer2 dc1\n"; + settings.MessageGroupId(sourceId2); + Cerr << "===Check SeqNo writer2 dc1\n"; CheckSeqNo("dc1", 14); + //! DC2 has no shift in SeqNo since 5 messages were written to dc 1. + Cerr << "===Check SeqNo writer2 dc2\n"; + CheckSeqNo("dc2", 15); auto readSession = client.CreateReadSession(setup1->GetReadSessionSettings()); @@ -287,7 +316,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { for (auto& message: event.GetMessages()) { TString sourceId = message.GetMessageGroupId(); ui32 seqNo = message.GetSeqNo(); - if (sourceId == setup1->GetTestMessageGroupId()) { + if (sourceId == sourceId1) { UNIT_ASSERT_VALUES_EQUAL(seqNo, seqNoByClusterSrc1[clusterName] + 1); seqNoByClusterSrc1[clusterName]++; auto& msgRemaining = MsgCountByClusterSrc1[clusterName]; @@ -296,7 +325,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { if (!msgRemaining) clustersPendingSrc1--; } else { - UNIT_ASSERT_VALUES_EQUAL(sourceId, tmpSrcId); + UNIT_ASSERT_VALUES_EQUAL(sourceId, sourceId2); auto& prevSeqNo = SeqNoByClusterSrc2[clusterName]; if (clusterName == "dc1") { UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h index 1f516eb6c58..b7d23d91ceb 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h @@ -80,15 +80,15 @@ public: } } - TString GetTestTopic() const { - return "topic1"; + static TString GetTestTopic() { + return "test-topic"; } - TString GetTestClient() const { + static TString GetTestClient() { return "test-reader"; } - TString GetTestMessageGroupId() const { + static TString GetTestMessageGroupId() { return "test-message-group-id"; } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h index 5bdad935c04..dd79296998e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h @@ -219,15 +219,25 @@ struct TYdbPqNoRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState { struct TYdbPqTestRetryPolicy : IRetryPolicy { TYdbPqTestRetryPolicy(const TDuration& delay = TDuration::MilliSeconds(2000)) : Delay(delay) - {} + { + Cerr << "====TYdbPqTestRetryPolicy()\n"; + } IRetryState::TPtr CreateRetryState() const override { + Cerr << "====CreateRetryState\n"; if (AtomicSwap(&OnFatalBreakDown, 0)) { return std::make_unique<TYdbPqNoRetryState>(); } - if (AtomicGet(Initialized_)) { + if (AtomicGet(Initialized_)) + { + Cerr << "====CreateRetryState Initialized\n"; auto res = AtomicSwap(&OnBreakDown, 0); UNIT_ASSERT(res); + for (size_t i = 0; i < 100; i++) { + if (AtomicGet(CurrentRetries) == 0) + break; + Sleep(TDuration::MilliSeconds(100)); + } UNIT_ASSERT(AtomicGet(CurrentRetries) == 0); } auto retryCb = [this]() mutable {this->RetryDone();}; @@ -254,6 +264,13 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy { } } void ExpectBreakDown() { + // Either TYdbPqTestRetryPolicy() or Initialize() should be called beforehand in order to set OnBreakDown=0 + Cerr << "====ExpectBreakDown\n"; + for (size_t i = 0; i < 100; i++) { + if (AtomicGet(OnBreakDown) == 0) + break; + Sleep(TDuration::MilliSeconds(100)); + } UNIT_ASSERT(AtomicGet(OnBreakDown) == 0); AtomicSet(CurrentRetries, 0); AtomicSet(OnBreakDown, 1); @@ -289,7 +306,7 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy { repairFuture.Wait(); } - void Initialized() { + void Initialize() { AtomicSet(Initialized_, 1); AtomicSet(CurrentRetries, 0); } |