diff options
author | ildar-khisam <ikhis@ydb.tech> | 2022-09-06 09:25:26 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2022-09-06 09:25:26 +0300 |
commit | 73c2d8a01f891bf51f963fe8a1bfa3c2724e6cb2 (patch) | |
tree | 06dfcd7173d5133d7d1f43db72d88dba6e40e08b | |
parent | 59bc7d40a717e50ad72ef30af2efbf595dba2e0b (diff) | |
download | ydb-73c2d8a01f891bf51f963fe8a1bfa3c2724e6cb2.tar.gz |
add test coveringbug fix
add test
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 78 |
1 files changed, 57 insertions, 21 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 2e8dd7931f0..4e50f86513d 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -793,29 +793,59 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } // Y_UNIT_TEST(TopicServiceReadBudget) Y_UNIT_TEST(TopicServiceSimpleHappyWrites) { - TPersQueueV1TestServer server; - SET_LOCALS; - MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService); - server.EnablePQLogs({NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY}); - server.EnablePQLogs({NKikimrServices::KQP_PROXY}, NLog::EPriority::PRI_EMERG); - server.EnablePQLogs({NKikimrServices::FLAT_TX_SCHEMESHARD}, NLog::EPriority::PRI_ERROR); + NPersQueue::TTestServer server; + server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR }); + TString topic3 = "acc/topic3"; - // add 2nd partition in this topic - pqClient->AlterTopic("rt3.dc1--acc--topic1", 2); + std::shared_ptr<grpc::Channel> Channel_; + std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_; - // grpc::ClientContext rcontextSecond; - // auto readStreamSecond = StubP_->StreamRead(&rcontextSecond); - // UNIT_ASSERT(readStreamSecond); + { + Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials()); + TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + } + + { + Ydb::Topic::CreateTopicRequest request; + Ydb::Topic::CreateTopicResponse response; + request.set_path(TStringBuilder() << "/Root/PQ/rt3.dc1--acc--topic3"); + + request.mutable_partitioning_settings()->set_min_active_partitions(2); + request.mutable_retention_period()->set_seconds(TDuration::Days(1).Seconds()); + (*request.mutable_attributes())["_max_partition_storage_size"] = "1000"; + request.set_partition_write_speed_bytes_per_second(1000); + request.set_partition_write_burst_bytes(1000); + + request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW); + request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_CUSTOM + 42); + + auto consumer = request.add_consumers(); + consumer->set_name("first-consumer"); + consumer->set_important(false); + grpc::ClientContext rcontext; + + auto status = TopicStubP_->CreateTopic(&rcontext, request, &response); + + UNIT_ASSERT(status.ok()); + Ydb::Topic::CreateTopicResult res; + response.operation().result().UnpackTo(&res); + Cerr << response << "\n" << res << "\n"; + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + server.AnnoyingClient->WaitTopicInit(topic3); + server.AnnoyingClient->AddTopic(topic3); + } grpc::ClientContext rcontextWrite1; - auto writeStream1 = StubP_->StreamWrite(&rcontextWrite1); + auto writeStream1 = TopicStubP_->StreamWrite(&rcontextWrite1); UNIT_ASSERT(writeStream1); grpc::ClientContext rcontextWrite2; - auto writeStream2 = StubP_->StreamWrite(&rcontextWrite2); + auto writeStream2 = TopicStubP_->StreamWrite(&rcontextWrite2); UNIT_ASSERT(writeStream2); - auto readStream = StubP_ -> StreamRead(&rcontext); + grpc::ClientContext rcontext; + auto readStream = TopicStubP_ -> StreamRead(&rcontext); UNIT_ASSERT(readStream); // init write session 1 @@ -823,7 +853,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::StreamWriteMessage::FromClient req; Ydb::Topic::StreamWriteMessage::FromServer resp; - req.mutable_init_request()->set_path("acc/topic1"); + req.mutable_init_request()->set_path("acc/topic3"); req.mutable_init_request()->set_producer_id("A"); req.mutable_init_request()->set_message_group_id("A"); @@ -838,7 +868,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.Clear(); auto* write = req.mutable_write_request(); - write->set_codec(1); + write->set_codec(Ydb::Topic::CODEC_RAW); for (ui32 i = 0; i < 10; ++i) { auto* msg = write->add_messages(); @@ -850,6 +880,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { if (!writeStream1->Write(req)) { ythrow yexception() << "write fail"; } + UNIT_ASSERT(writeStream1->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kWriteResponse); } // init write session 2 @@ -857,7 +890,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::StreamWriteMessage::FromClient req; Ydb::Topic::StreamWriteMessage::FromServer resp; - req.mutable_init_request()->set_path("acc/topic1"); + req.mutable_init_request()->set_path("acc/topic3"); req.mutable_init_request()->set_producer_id("B"); req.mutable_init_request()->set_message_group_id("B"); @@ -872,7 +905,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { req.Clear(); auto* write = req.mutable_write_request(); - write->set_codec(1); + write->set_codec(Ydb::Topic::CODEC_CUSTOM + 42); for (ui32 i = 0; i < 10; ++i) { auto* msg = write->add_messages(); @@ -884,6 +917,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { if (!writeStream2->Write(req)) { ythrow yexception() << "write fail"; } + UNIT_ASSERT(writeStream2->Read(&resp)); + Cerr << "===Got response: " << resp.ShortDebugString() << Endl; + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kWriteResponse); } // init 1st read session @@ -891,7 +927,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::StreamReadMessage::FromClient req; Ydb::Topic::StreamReadMessage::FromServer resp; - req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1"); + req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic3"); req.mutable_init_request()->set_consumer("user"); @@ -922,7 +958,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { //lock partition UNIT_ASSERT(readStream->Read(&resp)); UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); - UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3"); partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id()); assignId = resp.start_partition_session_request().partition_session().partition_session_id(); @@ -935,7 +971,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { resp.Clear(); UNIT_ASSERT(readStream->Read(&resp)); UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest); - UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic1"); + UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3"); partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id()); std::sort(partition_ids.begin(), partition_ids.end()); |