diff options
author | ildar-khisam <[email protected]> | 2023-02-03 12:56:12 +0300 |
---|---|---|
committer | ildar-khisam <[email protected]> | 2023-02-03 12:56:12 +0300 |
commit | c78ae4767be77cdc9b24922c8957670834d6c075 (patch) | |
tree | dac86d7f647dee61b59b7b4c424be59b53f4e605 | |
parent | 64f901bd6dbfaa97d04ad01fed8b8538cd91e324 (diff) |
fix and test
fix and test for
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 141 |
2 files changed, 135 insertions, 8 deletions
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index ebe6f3764df..141114356cc 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -248,7 +248,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF if (const auto token = request.update_token_request().token()) { // TODO: refresh token here ctx.Send(ctx.SelfID, new TEvPQProxy::TEvAuth(token)); } - break; + return (void)ReadFromStreamOrDie(ctx); } default: { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 51066713fef..79e10f13dab 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -660,8 +660,135 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp); } + + // write and read some more + { + auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source"); + for (int i = 17; i < 37; ++i) { + bool res = writer->Write("valuevaluevalue" + ToString(i), i); + UNIT_ASSERT(res); + } + bool res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + req.mutable_read_request()->set_bytes_size(1_MB); + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + } + // expect answer to read + resp.Clear(); + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "Got response " << resp << "\n"; + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); + } + + + Y_UNIT_TEST(StreamReadManyUpdateTokenAndRead) { + TPersQueueV1TestServer server; + SET_LOCALS; + MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService); + server.EnablePQLogs({ NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY }); + server.EnablePQLogs({ NKikimrServices::KQP_PROXY }, NLog::EPriority::PRI_EMERG); + server.EnablePQLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD }, NLog::EPriority::PRI_ERROR); + + auto readStream = StubP_->StreamRead(&rcontext); + UNIT_ASSERT(readStream); + + // init read session + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1"); + + req.mutable_init_request()->set_consumer("user"); + + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); + // send some reads + req.Clear(); + req.mutable_read_request()->set_bytes_size(2_KB); + // for (ui32 i = 0; i < 10; ++i) { + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + // } + } + + // await and confirm CreatePartitionStreamRequest from server + i64 assignId = 0; + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + //lock partition + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); + UNIT_ASSERT_VALUES_EQUAL(resp.start_partition_session_request().partition_session().path(), "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().partition_id() == 0); + + assignId = resp.start_partition_session_request().partition_session().partition_session_id(); + req.Clear(); + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + } + + // write to partition in 1 session + auto driver = pqClient->GetDriver(); + { + auto writer = CreateSimpleWriter(*driver, "acc/topic1", "source", /*partitionGroup=*/{}, /*codec=*/{"raw"}); + for (int i = 1; i < 1000; ++i) { + bool res = writer->Write(TString(2_KB, 'x'), i); + UNIT_ASSERT(res); + } + bool res = writer->Close(TDuration::Seconds(10)); + UNIT_ASSERT(res); + } + + //check read results + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + for (ui32 i = 0; i < 10; ++i) { + resp.Clear(); + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Expect ReadResponse, got " << resp << "\n"; + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); + + // send update token request, await response + const TString token = TString("test_user_" + ToString(i) + "@") + BUILTIN_ACL_DOMAIN;; + req.Clear(); + resp.Clear(); + req.mutable_update_token_request()->set_token(token); + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + UNIT_ASSERT(readStream->Read(&resp)); + Cerr << "===Expect UpdateTokenResponse, got response: " << resp.ShortDebugString() << Endl; + + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kUpdateTokenResponse, resp); + + req.Clear(); + req.mutable_read_request()->set_bytes_size(2208); + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + } } + + Y_UNIT_TEST(TopicServiceReadBudget) { TPersQueueV1TestServer server; SET_LOCALS; @@ -2577,27 +2704,27 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_FAIL("incorrect mode"); } - + Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Compressed) { WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, true, 1_MB - 1_KB, 1050); } - + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Compressed) { WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, true, 1_MB - 1_KB, 1050); } - + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Compressed) { WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, true, 1_MB - 1_KB, 1050); } - + Y_UNIT_TEST(WhenTheTopicIsDeletedBeforeDataIsDecompressed_Uncompressed) { WhenTheTopicIsDeletedImpl(AFTER_WRITES, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB); } - + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterDecompressingTheData_Uncompressed) { WhenTheTopicIsDeletedImpl(AFTER_START_TASKS, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB); } - + Y_UNIT_TEST(WhenTheTopicIsDeletedAfterReadingTheData_Uncompressed) { WhenTheTopicIsDeletedImpl(AFTER_DOREAD, 1_MB + 1, false, 1_MB - 1_KB, 1_MB - 1_KB); } @@ -5702,7 +5829,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(partId == 1 || partId == 3); } } - + Y_UNIT_TEST(LOGBROKER_7820) { // // 700 messages of 2000 characters are sent in the test |