summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <[email protected]>2023-02-03 12:56:12 +0300
committerildar-khisam <[email protected]>2023-02-03 12:56:12 +0300
commitc78ae4767be77cdc9b24922c8957670834d6c075 (patch)
treedac86d7f647dee61b59b7b4c424be59b53f4e605
parent64f901bd6dbfaa97d04ad01fed8b8538cd91e324 (diff)
fix and test
fix and test for
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp2
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp141
2 files changed, 135 insertions, 8 deletions
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index ebe6f3764df..141114356cc 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -248,7 +248,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
if (const auto token = request.update_token_request().token()) { // TODO: refresh token here
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvAuth(token));
}
- break;
+ return (void)ReadFromStreamOrDie(ctx);
}
default: {
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 51066713fef..79e10f13dab 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -660,8 +660,135 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp);
}
+
+ // write and read some more
+ {
+ auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source");
+ for (int i = 17; i < 37; ++i) {
+ bool res = writer->Write("valuevaluevalue" + ToString(i), i);
+ UNIT_ASSERT(res);
+ }
+ bool res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
+
+ req.mutable_read_request()->set_bytes_size(1_MB);
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ }
+ // expect answer to read
+ resp.Clear();
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "Got response " << resp << "\n";
+ UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
+ }
+
+
+ Y_UNIT_TEST(StreamReadManyUpdateTokenAndRead) {
+ TPersQueueV1TestServer server;
+ SET_LOCALS;
+ MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
+ server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY });
+ server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG);
+ server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR);
+
+ auto readStream = StubP_->StreamRead(&rcontext);
+ UNIT_ASSERT(readStream);
+
+ // init read session
+ {
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
+
+ req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1");
+
+ req.mutable_init_request()->set_consumer("user");
+
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse);
+ // send some reads
+ req.Clear();
+ req.mutable_read_request()->set_bytes_size(2_KB);
+ // for (ui32 i = 0; i < 10; ++i) {
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ // }
+ }
+
+ // await and confirm CreatePartitionStreamRequest from server
+ i64 assignId = 0;
+ {
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
+
+ //lock partition
+ UNIT_ASSERT(readStream->Read(&resp));
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
+ UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0);
+
+ assignId = resp.start_partition_session_request().partition_session().partition_session_id();
+ req.Clear();
+ req.mutable_start_partition_session_response()->set_partition_session_id(assignId);
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ }
+
+ // write to partition in 1 session
+ auto driver = pqClient->GetDriver();
+ {
+ auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"});
+ for (int i = 1; i < 1000; ++i) {
+ bool res = writer->Write(TString(2_KB, 'x'), i);
+ UNIT_ASSERT(res);
+ }
+ bool res = writer->Close(TDuration::Seconds(10));
+ UNIT_ASSERT(res);
+ }
+
+ //check read results
+ Ydb::Topic::StreamReadMessage::FromClient req;
+ Ydb::Topic::StreamReadMessage::FromServer resp;
+
+ for (ui32 i = 0; i < 10; ++i) {
+ resp.Clear();
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Expect ReadResponse, got " << resp << "\n";
+ UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp);
+
+ // send update token request, await response
+ const TString token = TString("test_user_" + ToString(i) + "@") + BUILTIN_ACL_DOMAIN;;
+ req.Clear();
+ resp.Clear();
+ req.mutable_update_token_request()->set_token(token);
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+
+ UNIT_ASSERT(readStream->Read(&resp));
+ Cerr << "===Expect UpdateTokenResponse, got response: " << resp.ShortDebugString() << Endl;
+
+ UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp);
+
+ req.Clear();
+ req.mutable_read_request()->set_bytes_size(2208);
+ if (!readStream->Write(req)) {
+ ythrow yexception() << "write fail";
+ }
+ }
}
+
+
Y_UNIT_TEST(TopicServiceReadBudget) {
TPersQueueV1TestServer server;
SET_LOCALS;
@@ -2577,27 +2704,27 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_FAIL("incorrect mode");
}
-
+
Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Compressed) {
WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, true, 1_MB - 1_KB, 1050);
}
-
+
Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Compressed) {
WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, true, 1_MB - 1_KB, 1050);
}
-
+
Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Compressed) {
WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, true, 1_MB - 1_KB, 1050);
}
-
+
Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Uncompressed) {
WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB);
}
-
+
Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Uncompressed) {
WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB);
}
-
+
Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Uncompressed) {
WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB);
}
@@ -5702,7 +5829,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(partId == 1 || partId == 3);
}
}
-
+
Y_UNIT_TEST(LOGBROKER_7820) {
//
// 700 messages of 2000 characters are sent in the test