diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-10-13 17:48:34 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-10-13 18:10:15 +0300 |
commit | cdbf79cb4e1fbf3be2451b73ef662732ca412c23 (patch) | |
tree | 27a95688fb004c72bcd2a53e2e1f79650c5921cd | |
parent | 9ad3ba816f192c0a5609a6570145b014c840f540 (diff) | |
download | ydb-cdbf79cb4e1fbf3be2451b73ef662732ca412c23.tar.gz |
fix update token test
fix test scheme
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 38 |
1 files changed, 16 insertions, 22 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index a1e1a33473..816779ecba 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -750,7 +750,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); // send some reads req.Clear(); - req.mutable_read_request()->set_bytes_size(2_KB); + req.mutable_read_request()->set_bytes_size(200_MB); if (!readStream->Write(req)) { ythrow yexception() << "write fail"; } @@ -778,21 +778,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { // write to partition in 1 session auto driver = pqClient->GetDriver(); - { - auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"}); - for (int i = 1; i < 1000; ++i) { - bool res = writer->Write(TString(2_KB, 'x'), i); - UNIT_ASSERT(res); - } - bool res = writer->Close(TDuration::Seconds(10)); - UNIT_ASSERT(res); - } + auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"}); //check read results Ydb::Topic::StreamReadMessage::FromClient req; Ydb::Topic::StreamReadMessage::FromServer resp; + int seqNo = 1; for (ui32 i = 0; i < 10; ++i) { + bool result = writer->Write(TString(2_KB, 'x'), seqNo++); + UNIT_ASSERT(result); + resp.Clear(); UNIT_ASSERT(readStream->Read(&resp)); Cerr << "===Expect ReadResponse, got " << resp << "\n"; @@ -811,15 +807,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << "===Expect UpdateTokenResponse, got response: " << resp.ShortDebugString() << Endl; UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp); - - req.Clear(); - req.mutable_read_request()->set_bytes_size(2208); - if (!readStream->Write(req)) { - ythrow yexception() << "write fail"; - } } { + bool result = writer->Write(TString(2_KB, 'x'), seqNo++); + UNIT_ASSERT(result); + resp.Clear(); UNIT_ASSERT(readStream->Read(&resp)); Cerr << "===Expect ReadResponse, got " << resp << "\n"; @@ -837,11 +830,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Cerr << "===Expect UpdateTokenResponse, got response: " << resp.ShortDebugString() << Endl; UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp); - req.Clear(); - req.mutable_read_request()->set_bytes_size(2208); - if (!readStream->Write(req)) { - ythrow yexception() << "write fail"; - } + result = writer->Write(TString(2_KB, 'x'), seqNo++); + UNIT_ASSERT(result); // why successful? resp.Clear(); @@ -856,6 +846,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_C(resp.status() == Ydb::StatusIds::UNAUTHORIZED, resp); } + + bool res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } Y_UNIT_TEST(TopicServiceCommitOffset) { @@ -6638,7 +6632,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { readParallel2.join(); auto diff = (TInstant::Now() - startTime).Seconds(); UNIT_ASSERT(diff <= (timeNeededToWaitQuotaAfterReadToEnd) + 1); - + readToEnd(serverWithInflightLimit, 1, 3, "", 1_MB); /* Here the reading quota is used. The two threads below will wait for the quota, and when execute consistently, |