aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-06-08 10:26:27 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-06-08 10:26:27 +0300
commitb62949a9443463174da1e380a0403934d8030cbe (patch)
tree63f054fb0420a8b4783575f6e09e1f6dab35d7db
parentd7d8fafed9482845dd3de0eae0bfaaf95fa4ef7a (diff)
downloadydb-b62949a9443463174da1e380a0403934d8030cbe.tar.gz
Flaky test RetryPolicy::TWriteSession_SeqNoShift
-rw-r--r--ydb/core/testlib/test_pq_client.h37
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp127
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h23
4 files changed, 122 insertions, 73 deletions
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index 4ff768299f9..dfa3d00d500 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -731,16 +731,18 @@ public:
runtime->DispatchEvents(options);
}
- ui32 TopicCreated(const TString& name, ui64 cacheSize = 0) {
+ ui32 GetTopicVersionFromMetadata(const TString& name, ui64 cacheSize = 0) {
TAutoPtr<NMsgBusProxy::TBusPersQueue> request(new NMsgBusProxy::TBusPersQueue);
auto req = request->Record.MutableMetaRequest()->MutableCmdGetTopicMetadata();
req->AddTopic(name);
-
+
+ Cerr << "GetTopicVersionFromMetadata request: " << name << Endl;
TAutoPtr<NBus::TBusMessage> reply;
NBus::EMessageStatus status = SyncCall(request, reply);
- Cerr << "Topic created - response: " << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cerr << "GetTopicVersionFromMetadata response: " << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
if (status != NBus::MESSAGE_OK)
return 0;
+
UNIT_ASSERT_VALUES_EQUAL(status, NBus::MESSAGE_OK);
const NMsgBusProxy::TBusResponse* response = dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Get());
UNIT_ASSERT(response);
@@ -766,13 +768,13 @@ public:
return topicInfo.GetConfig().GetVersion();
}
- ui32 TopicRealCreated(const TString& name) {
+ ui32 GetTopicVersionFromPath(const TString& name) {
TAutoPtr<NMsgBusProxy::TBusResponse> res = Ls("/Root/PQ/" + name);
- Cerr << res->Record << "\n";
- return res->Record.GetPathDescription().GetPersQueueGroup().GetAlterVersion();
+ ui32 version = res->Record.GetPathDescription().GetPersQueueGroup().GetAlterVersion();
+ Cerr << "GetTopicVersionFromPath: " << " record " << res->Record << " name " << name << " version" << version << "\n";
+ return version;
}
-
void RestartBalancerTablet(TTestActorRuntime* runtime, const TString& topic) {
TAutoPtr<NMsgBusProxy::TBusResponse> res = Ls("/Root/PQ/" + topic);
Cerr << res->Record << "\n";
@@ -855,11 +857,12 @@ public:
ModifyACL(pref, name.substr(pos + 1), acl.SerializeAsString());
}
-
void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true,
const TMaybe<TString>& dc = Nothing(), TVector<TString> rr = {"user"},
const TMaybe<TString>& account = Nothing(), bool expectFail = false
) {
+ Cerr << "CreateTopicNoLegacy: " << name << Endl;
+
TString path = name;
if (UseConfigTables && !path.StartsWith("/Root") && !account.Defined()) {
path = TStringBuilder() << "/Root/PQ/" << name;
@@ -906,15 +909,13 @@ public:
} while (true);
}
- void CreateTopic(
- const TRequestCreatePQ& createRequest,
- bool doWait = true
+ void CreateTopic(const TRequestCreatePQ& createRequest, bool doWait = true
) {
const TInstant start = TInstant::Now();
THolder<NMsgBusProxy::TBusPersQueue> request = createRequest.GetRequest();
- ui32 prevVersion = TopicCreated(createRequest.Topic);
+ ui32 prevVersion = GetTopicVersionFromMetadata(createRequest.Topic);
TAutoPtr<NBus::TBusMessage> reply;
const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply);
UNIT_ASSERT(response);
@@ -922,11 +923,11 @@ public:
TStringBuilder() << "proxy failure: " << response->Record.DebugString());
AddTopic(createRequest.Topic);
- while (doWait && TopicRealCreated(createRequest.Topic) != prevVersion + 1) {
+ while (doWait && GetTopicVersionFromPath(createRequest.Topic) != prevVersion + 1) {
Sleep(TDuration::MilliSeconds(500));
UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT);
}
- while (doWait && TopicCreated(createRequest.Topic, prevVersion) != prevVersion + 1) {
+ while (doWait && GetTopicVersionFromMetadata(createRequest.Topic, prevVersion) != prevVersion + 1) {
Sleep(TDuration::MilliSeconds(500));
UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT);
}
@@ -984,7 +985,7 @@ public:
TRequestAlterPQ requestDescr(name, nParts, cacheSize, lifetimeS, fillPartitionConfig, mirrorFrom);
THolder<NMsgBusProxy::TBusPersQueue> request = requestDescr.GetRequest();
- ui32 prevVersion = TopicCreated(name);
+ ui32 prevVersion = GetTopicVersionFromMetadata(name);
TAutoPtr<NBus::TBusMessage> reply;
const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
@@ -994,11 +995,11 @@ public:
const TInstant start = TInstant::Now();
AlterTopic();
- while (TopicCreated(name, cacheSize) != prevVersion + 1) {
+ while (GetTopicVersionFromMetadata(name, cacheSize) != prevVersion + 1) {
Sleep(TDuration::MilliSeconds(500));
UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT);
}
- while (TopicRealCreated(name) != prevVersion + 1) {
+ while (GetTopicVersionFromPath(name) != prevVersion + 1) {
Sleep(TDuration::MilliSeconds(500));
UNIT_ASSERT(TInstant::Now() - start < ::DEFAULT_DISPATCH_TIMEOUT);
}
@@ -1394,10 +1395,12 @@ private:
public:
void AddTopic(const TString& topic, const TMaybe<TString>& dc = Nothing()) {
+ Cerr << "AddTopic: " << topic << Endl;
return AddOrRemoveTopic(topic, true, dc);
}
void RemoveTopic(const TString& topic) {
+ Cerr << "RemoveTopic: " << topic << Endl;
return AddOrRemoveTopic(topic, false);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
index 73cb9d4df72..e23f5ddaa5f 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
@@ -15,7 +15,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
Y_UNIT_TEST(TWriteSession_TestPolicy) {
TYdbPqWriterTestHelper helper(TEST_CASE_NAME);
helper.Write(true);
- helper.Policy->Initialized(); // Thus ignoring possible early retries on "cluster initializing"
+ helper.Policy->Initialize(); // Thus ignoring possible early retries on "cluster initializing"
auto doBreakDown = [&] () {
helper.Policy->ExpectBreakDown();
NThreading::TPromise<void> retriesPromise = NThreading::NewPromise();
@@ -53,7 +53,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
Y_UNIT_TEST(TWriteSession_TestBrokenPolicy) {
TYdbPqWriterTestHelper helper(TEST_CASE_NAME);
helper.Write();
- helper.Policy->Initialized();
+ helper.Policy->Initialize();
helper.Policy->ExpectFatalBreakDown();
helper.EventLoop->AllowStop();
auto f1 = helper.Write(false);
@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
settings.AllowFallbackToOtherClusters(false);
settings.RetryPolicy(retryPolicy);
- retryPolicy->Initialized();
+ retryPolicy->Initialize();
retryPolicy->ExpectBreakDown();
auto& client = setup1->GetPersQueueClient();
@@ -116,7 +116,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1);
helper->Write(true);
auto retryPolicy = helper->Policy;
- retryPolicy->Initialized();
+ retryPolicy->Initialize();
auto waitForReconnect = [&](bool enable) {
Cerr << "=== Expect breakdown\n";
@@ -158,77 +158,106 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
setup2.Start();
setup1->AddDataCenter("dc2", setup2, true);
setup1->Start();
- TString tmpSrcId = "tmp-src-seqno-shift";
- auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, TString(), true);
- auto helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId, true);
+ TString sourceId1 = SDKTestSetup::GetTestMessageGroupId() + "1";
+ TString sourceId2 = SDKTestSetup::GetTestMessageGroupId() + "2";
+ auto writer1 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, sourceId1 , true);
+ auto writer2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, sourceId2, true);
auto settings = setup1->GetWriteSessionSettings();
auto& client = setup1->GetPersQueueClient();
+
//! Fill data in dc1 1 with SeqNo = 1..10 for 2 different SrcId
+ Cerr << "===Write 10 messages into every writer\n";
for (auto i = 0; i != 10; i++) {
- helper->Write(true); // 1
- helper2->Write(true); // 1
+ writer1->Write(true); // 1
+ writer2->Write(true); // 1
}
+ Cerr << "===Messages were written\n";
+
Cerr << "===Disable dc1\n";
//! Leave only dc2 available
+ writer1->Policy->ExpectBreakDown();
+ writer2->Policy->ExpectBreakDown();
setup1->DisableDataCenter("dc1");
- helper->Policy->ExpectBreakDown();
- helper->Policy->WaitForRetriesSync(1);
+ writer1->Policy->WaitForRetriesSync(1);
+ writer2->Policy->WaitForRetriesSync(1);
+
+ Cerr << "===Recreate writers\n";
- //! Re-create helpers, kill previous sessions. New sessions will connect to dc2.
- helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, TString(), true);
- helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, tmpSrcId, true);
+ //! Re-create writers, kill previous sessions. New sessions will connect to dc2.
+ writer1 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, sourceId1, true);
+ writer2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, sourceId2, true);
//! Write some data and await confirmation - just to ensure sessions are started.
- helper->Write(true);
- helper2->Write(true);
+ Cerr << "===Write one message into every writer\n";
+ writer1->Write(true);
+ writer2->Write(true);
+ Cerr << "===Messages were written\n";
- helper->Policy->ExpectBreakDown();
- Cerr << "Disable dc2\n";
//! Leave no available DCs
+ writer1->Policy->ExpectBreakDown();
+ writer2->Policy->ExpectBreakDown();
+ writer1->Policy->Initialize();
+ writer2->Policy->Initialize();
+
+ Cerr << "===Disable dc2\n";
setup1->DisableDataCenter("dc2");
- Cerr << "Wait for retries after initial dc2 shutdown\n";
- helper->Policy->WaitForRetriesSync(1);
+ Cerr << "===Wait for retries after initial dc2 shutdown\n";
+ writer1->Policy->WaitForRetriesSync(1);
+ writer2->Policy->WaitForRetriesSync(1);
//! Put some data inflight. It cannot be written now, but SeqNo will be assigned.
+ Cerr << "===Write four async message into every writer\n";
for (auto i = 0; i != 4; i++) {
- helper->Write(false);
- helper2->Write(false);
+ writer1->Write(false);
+ writer2->Write(false);
}
- auto f = helper->Write(false);
- auto f2 = helper2->Write(false);
+ auto f1 = writer1->Write(false);
+ auto f2 = writer2->Write(false);
+
//! Enable DC1. Now writers gonna write collected data to DC1 having LastSeqNo = 10
//! (because of data written in the very beginning), and inflight data has SeqNo assigned = 2..5,
//! so the SeqNo shift takes place.
+ Cerr << "===Enable dc2\n";
setup1->EnableDataCenter("dc1");
- Cerr << "Wait for writes to complete\n";
-
- f.Wait();
+ Cerr << "===Wait for writes to complete\n";
+ f1.Wait();
f2.Wait();
+ Cerr << "===Messages were written\n";
+
//! Writer1 is not used any more.
- helper->EventLoop->AllowStop();
- helper = nullptr;
+ writer1->EventLoop->AllowStop();
+ writer1 = nullptr;
+
+ Cerr << "===Writer 1 closed\n";
+ writer2->Policy->ExpectBreakDown();
+ writer2->Policy->Initialize();
- helper2->Policy->Initialized();
- helper2->Policy->ExpectBreakDown();
//! For the second writer, do switchback to dc2.
+ Cerr << "===Disable dc1\n";
setup1->DisableDataCenter("dc1");
- helper2->Policy->WaitForRetriesSync(1);
+ Cerr << "===Wait for retries after dc1 shutdown\n";
+ writer2->Policy->WaitForRetriesSync(1);
+
//! Put some data inflight again;
+ Cerr << "===Write four async messages into writer2\n";
for (auto i = 0; i != 4; i++) {
- helper2->Write(false);
+ writer2->Write(false);
}
- f = helper2->Write(false);
- Cerr << "Enable dc2\n";
+ f2 = writer2->Write(false);
+
+ Cerr << "===Enable dc2\n";
setup1->EnableDataCenter("dc2");
- f.Wait();
+ f2.Wait();
+ Cerr << "===Messages were written\n";
- helper2->EventLoop->AllowStop();
- helper2->Policy->ExpectBreakDown();
+ writer2->EventLoop->AllowStop();
+ writer2->Policy->ExpectBreakDown();
+ writer2 = nullptr;
- Cerr << "Enable dc1\n";
+ Cerr << "===Enable dc1\n";
setup1->EnableDataCenter("dc1");
auto CheckSeqNo = [&] (const TString& dcName, ui64 expectedSeqNo) {
settings.PreferredCluster(dcName);
@@ -242,19 +271,19 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
//!check SeqNo in both DC. For writer1 We expect 14 messages in DC1
//! (10 written initially + 4 written after reconnect) and 1 message in DC2 (only initial message).
- Cerr << "Check SeqNo writer1, dc2\n";
+ settings.MessageGroupId(sourceId1);
+ Cerr << "===Check SeqNo writer1, dc2\n";
CheckSeqNo("dc2", 1);
- Cerr << "Check SeqNo writer1, dc1\n";
+ Cerr << "===Check SeqNo writer1, dc1\n";
CheckSeqNo("dc1", 14);
//! Check SeqNo for writer 2; Expect to have 6 messages on DC2 with MaxSeqNo = 6;
- helper2 = nullptr;
- settings.MessageGroupId(tmpSrcId);
- Cerr << "Check SeqNo writer2 dc2\n";
- //!DC2 has no shift in SeqNo since 5 messages were written to dc 1.
- CheckSeqNo("dc2", 15);
- Cerr << "Check SeqNo writer2 dc1\n";
+ settings.MessageGroupId(sourceId2);
+ Cerr << "===Check SeqNo writer2 dc1\n";
CheckSeqNo("dc1", 14);
+ //! DC2 has no shift in SeqNo since 5 messages were written to dc 1.
+ Cerr << "===Check SeqNo writer2 dc2\n";
+ CheckSeqNo("dc2", 15);
auto readSession = client.CreateReadSession(setup1->GetReadSessionSettings());
@@ -287,7 +316,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
for (auto& message: event.GetMessages()) {
TString sourceId = message.GetMessageGroupId();
ui32 seqNo = message.GetSeqNo();
- if (sourceId == setup1->GetTestMessageGroupId()) {
+ if (sourceId == sourceId1) {
UNIT_ASSERT_VALUES_EQUAL(seqNo, seqNoByClusterSrc1[clusterName] + 1);
seqNoByClusterSrc1[clusterName]++;
auto& msgRemaining = MsgCountByClusterSrc1[clusterName];
@@ -296,7 +325,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
if (!msgRemaining)
clustersPendingSrc1--;
} else {
- UNIT_ASSERT_VALUES_EQUAL(sourceId, tmpSrcId);
+ UNIT_ASSERT_VALUES_EQUAL(sourceId, sourceId2);
auto& prevSeqNo = SeqNoByClusterSrc2[clusterName];
if (clusterName == "dc1") {
UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
index 1f516eb6c58..b7d23d91ceb 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/sdk_test_setup.h
@@ -80,15 +80,15 @@ public:
}
}
- TString GetTestTopic() const {
- return "topic1";
+ static TString GetTestTopic() {
+ return "test-topic";
}
- TString GetTestClient() const {
+ static TString GetTestClient() {
return "test-reader";
}
- TString GetTestMessageGroupId() const {
+ static TString GetTestMessageGroupId() {
return "test-message-group-id";
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
index 5bdad935c04..dd79296998e 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
@@ -219,15 +219,25 @@ struct TYdbPqNoRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState {
struct TYdbPqTestRetryPolicy : IRetryPolicy {
TYdbPqTestRetryPolicy(const TDuration& delay = TDuration::MilliSeconds(2000))
: Delay(delay)
- {}
+ {
+ Cerr << "====TYdbPqTestRetryPolicy()\n";
+ }
IRetryState::TPtr CreateRetryState() const override {
+ Cerr << "====CreateRetryState\n";
if (AtomicSwap(&OnFatalBreakDown, 0)) {
return std::make_unique<TYdbPqNoRetryState>();
}
- if (AtomicGet(Initialized_)) {
+ if (AtomicGet(Initialized_))
+ {
+ Cerr << "====CreateRetryState Initialized\n";
auto res = AtomicSwap(&OnBreakDown, 0);
UNIT_ASSERT(res);
+ for (size_t i = 0; i < 100; i++) {
+ if (AtomicGet(CurrentRetries) == 0)
+ break;
+ Sleep(TDuration::MilliSeconds(100));
+ }
UNIT_ASSERT(AtomicGet(CurrentRetries) == 0);
}
auto retryCb = [this]() mutable {this->RetryDone();};
@@ -254,6 +264,13 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy {
}
}
void ExpectBreakDown() {
+ // Either TYdbPqTestRetryPolicy() or Initialize() should be called beforehand in order to set OnBreakDown=0
+ Cerr << "====ExpectBreakDown\n";
+ for (size_t i = 0; i < 100; i++) {
+ if (AtomicGet(OnBreakDown) == 0)
+ break;
+ Sleep(TDuration::MilliSeconds(100));
+ }
UNIT_ASSERT(AtomicGet(OnBreakDown) == 0);
AtomicSet(CurrentRetries, 0);
AtomicSet(OnBreakDown, 1);
@@ -289,7 +306,7 @@ struct TYdbPqTestRetryPolicy : IRetryPolicy {
repairFuture.Wait();
}
- void Initialized() {
+ void Initialize() {
AtomicSet(Initialized_, 1);
AtomicSet(CurrentRetries, 0);
}