aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-10-13 17:48:34 +0300
committerildar-khisam <ikhis@ydb.tech>2023-10-13 18:10:15 +0300
commitcdbf79cb4e1fbf3be2451b73ef662732ca412c23 (patch)
tree27a95688fb004c72bcd2a53e2e1f79650c5921cd
parent9ad3ba816f192c0a5609a6570145b014c840f540 (diff)
downloadydb-cdbf79cb4e1fbf3be2451b73ef662732ca412c23.tar.gz
fix update token test
fix test scheme
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp38
1 files changed, 16 insertions, 22 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index a1e1a33473..816779ecba 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -750,7 +750,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
// send some reads
req.Clear();
- req.mutable_read_request()->set_bytes_size(2_KB);
+ req.mutable_read_request()->set_bytes_size(200_MB);
if (!readStream->Write(req)) {
ythrow yexception() << "write fail";
}
@@ -778,21 +778,17 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
// write to partition in 1 session
auto driver = pqClient->GetDriver();
- {
- auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"});
- for (int i = 1; i < 1000; ++i) {
- bool res = writer->Write(TString(2_KB, 'x'), i);
- UNIT_ASSERT(res);
- }
- bool res = writer->Close(TDuration::Seconds(10));
- UNIT_ASSERT(res);
- }
+ auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"});
//check read results
Ydb::Topic::StreamReadMessage::FromClient req;
Ydb::Topic::StreamReadMessage::FromServer resp;
+ int seqNo = 1;
for (ui32 i = 0; i < 10; ++i) {
+ bool result = writer->Write(TString(2_KB, 'x'), seqNo++);
+ UNIT_ASSERT(result);
+
resp.Clear();
UNIT_ASSERT(readStream->Read(&resp));
Cerr << "===Expect ReadResponse, got " << resp << "\n";
@@ -811,15 +807,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << "===Expect UpdateTokenResponse, got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp);
-
- req.Clear();
- req.mutable_read_request()->set_bytes_size(2208);
- if (!readStream->Write(req)) {
- ythrow yexception() << "write fail";
- }
}
{
+ bool result = writer->Write(TString(2_KB, 'x'), seqNo++);
+ UNIT_ASSERT(result);
+
resp.Clear();
UNIT_ASSERT(readStream->Read(&resp));
Cerr << "===Expect ReadResponse, got " << resp << "\n";
@@ -837,11 +830,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Cerr << "===Expect UpdateTokenResponse, got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp);
- req.Clear();
- req.mutable_read_request()->set_bytes_size(2208);
- if (!readStream->Write(req)) {
- ythrow yexception() << "write fail";
- }
+ result = writer->Write(TString(2_KB, 'x'), seqNo++);
+ UNIT_ASSERT(result);
// why successful?
resp.Clear();
@@ -856,6 +846,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT_C(resp.status() == Ydb::StatusIds::UNAUTHORIZED, resp);
}
+
+ bool res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+
}
Y_UNIT_TEST(TopicServiceCommitOffset) {
@@ -6638,7 +6632,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
readParallel2.join();
auto diff = (TInstant::Now() - startTime).Seconds();
UNIT_ASSERT(diff <= (timeNeededToWaitQuotaAfterReadToEnd) + 1);
-
+
readToEnd(serverWithInflightLimit, 1, 3, "", 1_MB);
/*
Here the reading quota is used. The two threads below will wait for the quota, and when execute consistently,