diff options
author | komels <komels@ydb.tech> | 2022-12-15 14:08:46 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2022-12-15 14:08:46 +0300 |
commit | 3070c123fb92e1bde410bbe6a725079cfebd19e3 (patch) | |
tree | ddbc2cef05c227fb623843d5780ccbc4bb4cb367 | |
parent | a3a067dc098f870195f22bac6071ab85d6bcb041 (diff) | |
download | ydb-3070c123fb92e1bde410bbe6a725079cfebd19e3.tar.gz |
Fix tests in sdk
3 files changed, 66 insertions, 36 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 24cf6651cd..685efd1e14 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -246,8 +246,9 @@ void TWriteSession::OnCdsResponse( OnSeqNoShift = false; } else { // Switched from initial cluster to second one; Y_VERIFY(CurrentCluster == InitialCluster); - if (AutoSeqNoMode.GetOrElse(true)) + if (AutoSeqNoMode.GetOrElse(true)) { OnSeqNoShift = true; + } } } @@ -694,7 +695,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl( // SeqNo increased, so there's a risk of loss, apply SeqNo shift. // MinUnsentSeqNo must be > 0 if anything was ever sent yet if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { - SeqNoShift = newLastSeqNo - MinUnsentSeqNo; + SeqNoShift = newLastSeqNo - MinUnsentSeqNo; } result.InitSeqNo = newLastSeqNo; LastSeqNo = newLastSeqNo; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp index 2e98ba792c..eb8e2cc1ef 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp @@ -158,10 +158,9 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { setup2.Start(); setup1->AddDataCenter("dc2", setup2, true); setup1->Start(); - TString tmpSrcId = "tmp-src-seqno-shift", noShiftSrcId = "tmp-src-no-seqno-shift"; - auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1); - auto helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId); - //auto helperNoShift = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId); + TString tmpSrcId = "tmp-src-seqno-shift"; + auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, TString(), true); + auto helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId, true); auto settings = setup1->GetWriteSessionSettings(); auto& client = setup1->GetPersQueueClient(); @@ -177,18 +176,18 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { helper->Policy->WaitForRetriesSync(1); //! Re-create helpers, kill previous sessions. New sessions will connect to dc2. - helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1); - helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, tmpSrcId); + helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, TString(), true); + helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, tmpSrcId, true); //! Write some data and await confirmation - just to ensure sessions are started. helper->Write(true); helper2->Write(true); helper->Policy->ExpectBreakDown(); - Cerr << "===Disable dc2\n"; + Cerr << "Disable dc2\n"; //! Leave no available DCs setup1->DisableDataCenter("dc2"); - Cerr << "=== Wait for retries after initial dc2 shutdown\n"; + Cerr << "Wait for retries after initial dc2 shutdown\n"; helper->Policy->WaitForRetriesSync(1); //! Put some data inflight. It cannot be written now, but SeqNo will be assigned. @@ -199,11 +198,11 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { auto f = helper->Write(false); auto f2 = helper2->Write(false); //! Enable DC1. Now writers gonna write collected data to DC1 having LastSeqNo = 10 - //! (because of data written and the very beginning), and inflight data has SeqNo assingned = 2..5, + //! (because of data written in the very beginning), and inflight data has SeqNo assigned = 2..5, //! so the SeqNo shift takes place. setup1->EnableDataCenter("dc1"); - Cerr << "=====Wait for writes to complete\n"; + Cerr << "Wait for writes to complete\n"; f.Wait(); f2.Wait(); //! Writer1 is not used any more. @@ -220,13 +219,13 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { helper2->Write(false); } f = helper2->Write(false); - Cerr << "===Enable dc2\n"; + Cerr << "Enable dc2\n"; setup1->EnableDataCenter("dc2"); f.Wait(); helper2->EventLoop->AllowStop(); helper2->Policy->ExpectBreakDown(); - Cerr << "===Enable dc1\n"; + Cerr << "Enable dc1\n"; setup1->EnableDataCenter("dc1"); auto CheckSeqNo = [&] (const TString& dcName, ui64 expectedSeqNo) { settings.PreferredCluster(dcName); @@ -239,19 +238,19 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { }; //!check SeqNo in both DC. For writer1 We expect 14 messages in DC1 - //! (10 written initially + 4 wriiten after recoonect) and 1 message in DC2 (only initial message). - Cerr << "===Check SeqNo writer1, dc2\n"; + //! (10 written initially + 4 written after reconnect) and 1 message in DC2 (only initial message). + Cerr << "Check SeqNo writer1, dc2\n"; CheckSeqNo("dc2", 1); - Cerr << "===Check SeqNo writer1, dc1\n"; + Cerr << "Check SeqNo writer1, dc1\n"; CheckSeqNo("dc1", 14); //! Check SeqNo for writer 2; Expect to have 6 messages on DC2 with MaxSeqNo = 6; helper2 = nullptr; settings.MessageGroupId(tmpSrcId); - Cerr << "===Check SeqNo writer2 dc2\n"; - //!DC2 has no a ap in SeqNo since 5 messages were written ot dc 1. - CheckSeqNo("dc2", 11); - Cerr << "===Check SeqNo writer2 dc1\n"; + Cerr << "Check SeqNo writer2 dc2\n"; + //!DC2 has no shift in SeqNo since 5 messages were written to dc 1. + CheckSeqNo("dc2", 15); + Cerr << "Check SeqNo writer2 dc1\n"; CheckSeqNo("dc1", 14); @@ -296,13 +295,19 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { } else { UNIT_ASSERT_VALUES_EQUAL(sourceId, tmpSrcId); auto& prevSeqNo = SeqNoByClusterSrc2[clusterName]; - if (clusterName == "dc1" || prevSeqNo != 1) { + if (clusterName == "dc1") { UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1); prevSeqNo++; } else { UNIT_ASSERT_VALUES_EQUAL(clusterName, "dc2"); - UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 6); - prevSeqNo+= 6; + if (prevSeqNo == 0) { + UNIT_ASSERT_VALUES_EQUAL(seqNo, 1); + } else if (prevSeqNo == 1) { + UNIT_ASSERT_VALUES_EQUAL(seqNo, 11); + } else { + UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1); + } + prevSeqNo = seqNo; } auto& msgRemaining = MsgCountByClusterSrc2[clusterName]; UNIT_ASSERT(msgRemaining > 0); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h index 69d62dfa48..5bdad935c0 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h @@ -78,10 +78,12 @@ public: IRetryPolicy::TPtr retryPolicy = nullptr, IExecutor::TPtr compressExecutor = nullptr, const TString& preferredCluster = TString(), - const TString& sourceId = TString() + const TString& sourceId = TString(), + bool autoSeqNo = false ) : IClientEventLoop() , Setup(setup) + , AutoSeqNo(autoSeqNo) { Log = Setup->GetLog(); Thread = std::make_unique<TThread>([setup, retryPolicy, compressExecutor, preferredCluster, sourceId, this]() { @@ -101,6 +103,7 @@ public: TMaybe<TContinuationToken> continueToken; NThreading::TFuture<void> waitEventFuture = writer->WaitEvent(); THashMap<ui64, NThreading::TPromise<::NPersQueue::TWriteResult>> ackPromiseBySequenceNumber; + TDeque<NThreading::TPromise<::NPersQueue::TWriteResult>> ackPromiseQueue; while (!MustStop) { if (!continueToken) { Log << TLOG_INFO << "Wait for writer event"; @@ -113,12 +116,16 @@ public: waitEventFuture = writer->WaitEvent(); std::visit(TOverloaded { [&](const TWriteSessionEvent::TAcksEvent& event) { - TVector<ui64> sequenceNumbers; for (const auto& ack : event.Acks) { - UNIT_ASSERT(ackPromiseBySequenceNumber.contains(ack.SeqNo)); - sequenceNumbers.push_back(ack.SeqNo); - ackPromiseBySequenceNumber[ack.SeqNo].SetValue({true, false}); - ackPromiseBySequenceNumber.erase(ack.SeqNo); + if (AutoSeqNo) { + UNIT_ASSERT(!ackPromiseQueue.empty()); + ackPromiseQueue.front().SetValue({true, false}); + ackPromiseQueue.pop_front(); + } else { + UNIT_ASSERT(ackPromiseBySequenceNumber.contains(ack.SeqNo)); + ackPromiseBySequenceNumber[ack.SeqNo].SetValue({true, false}); + ackPromiseBySequenceNumber.erase(ack.SeqNo); + } } }, [&](TWriteSessionEvent::TReadyToAcceptEvent& event) { @@ -126,11 +133,12 @@ public: continueToken = std::move(event.ContinuationToken); }, [&](const TSessionClosedEvent& event) { - Log << TLOG_INFO << "===Got close event: " << event.DebugString() << Endl; + Log << TLOG_INFO << "Got close event: " << event.DebugString() << Endl; if (!MayStop) { UNIT_ASSERT(MustStop); UNIT_ASSERT(MessageBuffer.IsEmpty()); UNIT_ASSERT(ackPromiseBySequenceNumber.empty()); + UNIT_ASSERT(ackPromiseQueue.empty()); } else { MustStop = true; closed = true; @@ -142,13 +150,24 @@ public: if (continueToken && !MessageBuffer.IsEmpty()) { ::NPersQueue::TAcknowledgableMessage acknowledgeableMessage; Y_VERIFY(MessageBuffer.Dequeue(acknowledgeableMessage)); - ackPromiseBySequenceNumber.emplace(acknowledgeableMessage.SequenceNumber, acknowledgeableMessage.AckPromise); + if (AutoSeqNo) { + ackPromiseQueue.emplace_back(acknowledgeableMessage.AckPromise); + } else { + ackPromiseBySequenceNumber.emplace(acknowledgeableMessage.SequenceNumber, + acknowledgeableMessage.AckPromise); + } Y_VERIFY(continueToken); - Log << TLOG_INFO << "Write messages with sequence numbers " << acknowledgeableMessage.SequenceNumber; + + TMaybe<ui64> seqNo = Nothing(); + if (!AutoSeqNo) { + seqNo = acknowledgeableMessage.SequenceNumber; + Log << TLOG_INFO << "[" << sourceId << "] Write messages with sequence numbers " + << acknowledgeableMessage.SequenceNumber; + } writer->Write( std::move(*continueToken), std::move(acknowledgeableMessage.Value), - acknowledgeableMessage.SequenceNumber, + seqNo, acknowledgeableMessage.CreatedAt ); continueToken = Nothing(); @@ -161,6 +180,9 @@ public: }); Thread->Start(); } + +private: + bool AutoSeqNo; }; struct TYdbPqTestRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState { @@ -369,14 +391,16 @@ public: std::shared_ptr<TLockFreeQueue<ui64>> executorQueue = nullptr, const TString& preferredCluster = TString(), std::shared_ptr<TPersQueueYdbSdkTestSetup> setup = nullptr, - const TString& sourceId = TString() + const TString& sourceId = TString(), + bool autoSeqNo = false ) : Setup(setup ? setup : std::make_shared<TPersQueueYdbSdkTestSetup>(name)) , Policy(std::make_shared<TYdbPqTestRetryPolicy>()) { if (executorQueue) CompressExecutor = MakeIntrusive<TYdbPqTestExecutor>(executorQueue); - EventLoop = std::make_unique<TYDBClientEventLoop>(Setup, Policy, CompressExecutor, preferredCluster, sourceId); + EventLoop = std::make_unique<TYDBClientEventLoop>(Setup, Policy, CompressExecutor, preferredCluster, sourceId, + autoSeqNo); } NThreading::TFuture<::NPersQueue::TWriteResult> Write(bool doWait = false, const TString& message = TString()) { |