aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2022-12-15 14:08:46 +0300
committerkomels <komels@ydb.tech>2022-12-15 14:08:46 +0300
commit3070c123fb92e1bde410bbe6a725079cfebd19e3 (patch)
treeddbc2cef05c227fb623843d5780ccbc4bb4cb367
parenta3a067dc098f870195f22bac6071ab85d6bcb041 (diff)
downloadydb-3070c123fb92e1bde410bbe6a725079cfebd19e3.tar.gz
Fix tests in sdk
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp49
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h48
3 files changed, 66 insertions, 36 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 24cf6651cd..685efd1e14 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -246,8 +246,9 @@ void TWriteSession::OnCdsResponse(
OnSeqNoShift = false;
} else { // Switched from initial cluster to second one;
Y_VERIFY(CurrentCluster == InitialCluster);
- if (AutoSeqNoMode.GetOrElse(true))
+ if (AutoSeqNoMode.GetOrElse(true)) {
OnSeqNoShift = true;
+ }
}
}
@@ -694,7 +695,7 @@ TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl(
// SeqNo increased, so there's a risk of loss, apply SeqNo shift.
// MinUnsentSeqNo must be > 0 if anything was ever sent yet
if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) {
- SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
+ SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
}
result.InitSeqNo = newLastSeqNo;
LastSeqNo = newLastSeqNo;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
index 2e98ba792c..eb8e2cc1ef 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
@@ -158,10 +158,9 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
setup2.Start();
setup1->AddDataCenter("dc2", setup2, true);
setup1->Start();
- TString tmpSrcId = "tmp-src-seqno-shift", noShiftSrcId = "tmp-src-no-seqno-shift";
- auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1);
- auto helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId);
- //auto helperNoShift = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId);
+ TString tmpSrcId = "tmp-src-seqno-shift";
+ auto helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, TString(), true);
+ auto helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, "dc1", setup1, tmpSrcId, true);
auto settings = setup1->GetWriteSessionSettings();
auto& client = setup1->GetPersQueueClient();
@@ -177,18 +176,18 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
helper->Policy->WaitForRetriesSync(1);
//! Re-create helpers, kill previous sessions. New sessions will connect to dc2.
- helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1);
- helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, tmpSrcId);
+ helper = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, TString(), true);
+ helper2 = MakeHolder<TYdbPqWriterTestHelper>("", nullptr, TString(), setup1, tmpSrcId, true);
//! Write some data and await confirmation - just to ensure sessions are started.
helper->Write(true);
helper2->Write(true);
helper->Policy->ExpectBreakDown();
- Cerr << "===Disable dc2\n";
+ Cerr << "Disable dc2\n";
//! Leave no available DCs
setup1->DisableDataCenter("dc2");
- Cerr << "=== Wait for retries after initial dc2 shutdown\n";
+ Cerr << "Wait for retries after initial dc2 shutdown\n";
helper->Policy->WaitForRetriesSync(1);
//! Put some data inflight. It cannot be written now, but SeqNo will be assigned.
@@ -199,11 +198,11 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
auto f = helper->Write(false);
auto f2 = helper2->Write(false);
//! Enable DC1. Now writers gonna write collected data to DC1 having LastSeqNo = 10
- //! (because of data written and the very beginning), and inflight data has SeqNo assingned = 2..5,
+ //! (because of data written in the very beginning), and inflight data has SeqNo assigned = 2..5,
//! so the SeqNo shift takes place.
setup1->EnableDataCenter("dc1");
- Cerr << "=====Wait for writes to complete\n";
+ Cerr << "Wait for writes to complete\n";
f.Wait();
f2.Wait();
//! Writer1 is not used any more.
@@ -220,13 +219,13 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
helper2->Write(false);
}
f = helper2->Write(false);
- Cerr << "===Enable dc2\n";
+ Cerr << "Enable dc2\n";
setup1->EnableDataCenter("dc2");
f.Wait();
helper2->EventLoop->AllowStop();
helper2->Policy->ExpectBreakDown();
- Cerr << "===Enable dc1\n";
+ Cerr << "Enable dc1\n";
setup1->EnableDataCenter("dc1");
auto CheckSeqNo = [&] (const TString& dcName, ui64 expectedSeqNo) {
settings.PreferredCluster(dcName);
@@ -239,19 +238,19 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
};
//!check SeqNo in both DC. For writer1 We expect 14 messages in DC1
- //! (10 written initially + 4 wriiten after recoonect) and 1 message in DC2 (only initial message).
- Cerr << "===Check SeqNo writer1, dc2\n";
+ //! (10 written initially + 4 written after reconnect) and 1 message in DC2 (only initial message).
+ Cerr << "Check SeqNo writer1, dc2\n";
CheckSeqNo("dc2", 1);
- Cerr << "===Check SeqNo writer1, dc1\n";
+ Cerr << "Check SeqNo writer1, dc1\n";
CheckSeqNo("dc1", 14);
//! Check SeqNo for writer 2; Expect to have 6 messages on DC2 with MaxSeqNo = 6;
helper2 = nullptr;
settings.MessageGroupId(tmpSrcId);
- Cerr << "===Check SeqNo writer2 dc2\n";
- //!DC2 has no a ap in SeqNo since 5 messages were written ot dc 1.
- CheckSeqNo("dc2", 11);
- Cerr << "===Check SeqNo writer2 dc1\n";
+ Cerr << "Check SeqNo writer2 dc2\n";
+ //!DC2 has no shift in SeqNo since 5 messages were written to dc 1.
+ CheckSeqNo("dc2", 15);
+ Cerr << "Check SeqNo writer2 dc1\n";
CheckSeqNo("dc1", 14);
@@ -296,13 +295,19 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
} else {
UNIT_ASSERT_VALUES_EQUAL(sourceId, tmpSrcId);
auto& prevSeqNo = SeqNoByClusterSrc2[clusterName];
- if (clusterName == "dc1" || prevSeqNo != 1) {
+ if (clusterName == "dc1") {
UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1);
prevSeqNo++;
} else {
UNIT_ASSERT_VALUES_EQUAL(clusterName, "dc2");
- UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 6);
- prevSeqNo+= 6;
+ if (prevSeqNo == 0) {
+ UNIT_ASSERT_VALUES_EQUAL(seqNo, 1);
+ } else if (prevSeqNo == 1) {
+ UNIT_ASSERT_VALUES_EQUAL(seqNo, 11);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1);
+ }
+ prevSeqNo = seqNo;
}
auto& msgRemaining = MsgCountByClusterSrc2[clusterName];
UNIT_ASSERT(msgRemaining > 0);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
index 69d62dfa48..5bdad935c0 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h
@@ -78,10 +78,12 @@ public:
IRetryPolicy::TPtr retryPolicy = nullptr,
IExecutor::TPtr compressExecutor = nullptr,
const TString& preferredCluster = TString(),
- const TString& sourceId = TString()
+ const TString& sourceId = TString(),
+ bool autoSeqNo = false
)
: IClientEventLoop()
, Setup(setup)
+ , AutoSeqNo(autoSeqNo)
{
Log = Setup->GetLog();
Thread = std::make_unique<TThread>([setup, retryPolicy, compressExecutor, preferredCluster, sourceId, this]() {
@@ -101,6 +103,7 @@ public:
TMaybe<TContinuationToken> continueToken;
NThreading::TFuture<void> waitEventFuture = writer->WaitEvent();
THashMap<ui64, NThreading::TPromise<::NPersQueue::TWriteResult>> ackPromiseBySequenceNumber;
+ TDeque<NThreading::TPromise<::NPersQueue::TWriteResult>> ackPromiseQueue;
while (!MustStop) {
if (!continueToken) {
Log << TLOG_INFO << "Wait for writer event";
@@ -113,12 +116,16 @@ public:
waitEventFuture = writer->WaitEvent();
std::visit(TOverloaded {
[&](const TWriteSessionEvent::TAcksEvent& event) {
- TVector<ui64> sequenceNumbers;
for (const auto& ack : event.Acks) {
- UNIT_ASSERT(ackPromiseBySequenceNumber.contains(ack.SeqNo));
- sequenceNumbers.push_back(ack.SeqNo);
- ackPromiseBySequenceNumber[ack.SeqNo].SetValue({true, false});
- ackPromiseBySequenceNumber.erase(ack.SeqNo);
+ if (AutoSeqNo) {
+ UNIT_ASSERT(!ackPromiseQueue.empty());
+ ackPromiseQueue.front().SetValue({true, false});
+ ackPromiseQueue.pop_front();
+ } else {
+ UNIT_ASSERT(ackPromiseBySequenceNumber.contains(ack.SeqNo));
+ ackPromiseBySequenceNumber[ack.SeqNo].SetValue({true, false});
+ ackPromiseBySequenceNumber.erase(ack.SeqNo);
+ }
}
},
[&](TWriteSessionEvent::TReadyToAcceptEvent& event) {
@@ -126,11 +133,12 @@ public:
continueToken = std::move(event.ContinuationToken);
},
[&](const TSessionClosedEvent& event) {
- Log << TLOG_INFO << "===Got close event: " << event.DebugString() << Endl;
+ Log << TLOG_INFO << "Got close event: " << event.DebugString() << Endl;
if (!MayStop) {
UNIT_ASSERT(MustStop);
UNIT_ASSERT(MessageBuffer.IsEmpty());
UNIT_ASSERT(ackPromiseBySequenceNumber.empty());
+ UNIT_ASSERT(ackPromiseQueue.empty());
} else {
MustStop = true;
closed = true;
@@ -142,13 +150,24 @@ public:
if (continueToken && !MessageBuffer.IsEmpty()) {
::NPersQueue::TAcknowledgableMessage acknowledgeableMessage;
Y_VERIFY(MessageBuffer.Dequeue(acknowledgeableMessage));
- ackPromiseBySequenceNumber.emplace(acknowledgeableMessage.SequenceNumber, acknowledgeableMessage.AckPromise);
+ if (AutoSeqNo) {
+ ackPromiseQueue.emplace_back(acknowledgeableMessage.AckPromise);
+ } else {
+ ackPromiseBySequenceNumber.emplace(acknowledgeableMessage.SequenceNumber,
+ acknowledgeableMessage.AckPromise);
+ }
Y_VERIFY(continueToken);
- Log << TLOG_INFO << "Write messages with sequence numbers " << acknowledgeableMessage.SequenceNumber;
+
+ TMaybe<ui64> seqNo = Nothing();
+ if (!AutoSeqNo) {
+ seqNo = acknowledgeableMessage.SequenceNumber;
+ Log << TLOG_INFO << "[" << sourceId << "] Write messages with sequence numbers "
+ << acknowledgeableMessage.SequenceNumber;
+ }
writer->Write(
std::move(*continueToken),
std::move(acknowledgeableMessage.Value),
- acknowledgeableMessage.SequenceNumber,
+ seqNo,
acknowledgeableMessage.CreatedAt
);
continueToken = Nothing();
@@ -161,6 +180,9 @@ public:
});
Thread->Start();
}
+
+private:
+ bool AutoSeqNo;
};
struct TYdbPqTestRetryState : NYdb::NPersQueue::IRetryPolicy::IRetryState {
@@ -369,14 +391,16 @@ public:
std::shared_ptr<TLockFreeQueue<ui64>> executorQueue = nullptr,
const TString& preferredCluster = TString(),
std::shared_ptr<TPersQueueYdbSdkTestSetup> setup = nullptr,
- const TString& sourceId = TString()
+ const TString& sourceId = TString(),
+ bool autoSeqNo = false
)
: Setup(setup ? setup : std::make_shared<TPersQueueYdbSdkTestSetup>(name))
, Policy(std::make_shared<TYdbPqTestRetryPolicy>())
{
if (executorQueue)
CompressExecutor = MakeIntrusive<TYdbPqTestExecutor>(executorQueue);
- EventLoop = std::make_unique<TYDBClientEventLoop>(Setup, Policy, CompressExecutor, preferredCluster, sourceId);
+ EventLoop = std::make_unique<TYDBClientEventLoop>(Setup, Policy, CompressExecutor, preferredCluster, sourceId,
+ autoSeqNo);
}
NThreading::TFuture<::NPersQueue::TWriteResult> Write(bool doWait = false, const TString& message = TString()) {