aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2022-09-06 09:25:26 +0300
committerildar-khisam <ikhis@ydb.tech>2022-09-06 09:25:26 +0300
commit73c2d8a01f891bf51f963fe8a1bfa3c2724e6cb2 (patch)
tree06dfcd7173d5133d7d1f43db72d88dba6e40e08b
parent59bc7d40a717e50ad72ef30af2efbf595dba2e0b (diff)
downloadydb-73c2d8a01f891bf51f963fe8a1bfa3c2724e6cb2.tar.gz
add test coveringbug fix
add test
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp78
1 files changed, 57 insertions, 21 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 2e8dd7931f0..4e50f86513d 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -793,29 +793,59 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
} // Y_UNIT_TEST(TopicServiceReadBudget)
Y_UNIT_TEST(TopicServiceSimpleHappyWrites) {
- TPersQueueV1TestServer server;
- SET_LOCALS;
- MAKE_INSECURE_STUB(Ydb::Topic::V1::TopicService);
- server.EnablePQLogs({NKikimrServices::PQ_METACACHE, NKikimrServices::PQ_READ_PROXY});
- server.EnablePQLogs({NKikimrServices::KQP_PROXY}, NLog::EPriority::PRI_EMERG);
- server.EnablePQLogs({NKikimrServices::FLAT_TX_SCHEMESHARD}, NLog::EPriority::PRI_ERROR);
+ NPersQueue::TTestServer server;
+ server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
+ TString topic3 = "acc/topic3";
- // add 2nd partition in this topic
- pqClient->AlterTopic("rt3.dc1--acc--topic1", 2);
+ std::shared_ptr<grpc::Channel> Channel_;
+ std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
- // grpc::ClientContext rcontextSecond;
- // auto readStreamSecond = StubP_->StreamRead(&rcontextSecond);
- // UNIT_ASSERT(readStreamSecond);
+ {
+ Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
+ TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
+ }
+
+ {
+ Ydb::Topic::CreateTopicRequest request;
+ Ydb::Topic::CreateTopicResponse response;
+ request.set_path(TStringBuilder() << "/Root/PQ/rt3.dc1--acc--topic3");
+
+ request.mutable_partitioning_settings()->set_min_active_partitions(2);
+ request.mutable_retention_period()->set_seconds(TDuration::Days(1).Seconds());
+ (*request.mutable_attributes())["_max_partition_storage_size"] = "1000";
+ request.set_partition_write_speed_bytes_per_second(1000);
+ request.set_partition_write_burst_bytes(1000);
+
+ request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_RAW);
+ request.mutable_supported_codecs()->add_codecs(Ydb::Topic::CODEC_CUSTOM + 42);
+
+ auto consumer = request.add_consumers();
+ consumer->set_name("first-consumer");
+ consumer->set_important(false);
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->CreateTopic(&rcontext, request, &response);
+
+ UNIT_ASSERT(status.ok());
+ Ydb::Topic::CreateTopicResult res;
+ response.operation().result().UnpackTo(&res);
+ Cerr << response << "\n" << res << "\n";
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ server.AnnoyingClient->WaitTopicInit(topic3);
+ server.AnnoyingClient->AddTopic(topic3);
+ }
grpc::ClientContext rcontextWrite1;
- auto writeStream1 = StubP_->StreamWrite(&rcontextWrite1);
+ auto writeStream1 = TopicStubP_->StreamWrite(&rcontextWrite1);
UNIT_ASSERT(writeStream1);
grpc::ClientContext rcontextWrite2;
- auto writeStream2 = StubP_->StreamWrite(&rcontextWrite2);
+ auto writeStream2 = TopicStubP_->StreamWrite(&rcontextWrite2);
UNIT_ASSERT(writeStream2);
- auto readStream = StubP_ -> StreamRead(&rcontext);
+ grpc::ClientContext rcontext;
+ auto readStream = TopicStubP_ -> StreamRead(&rcontext);
UNIT_ASSERT(readStream);
// init write session 1
@@ -823,7 +853,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;
- req.mutable_init_request()->set_path("acc/topic1");
+ req.mutable_init_request()->set_path("acc/topic3");
req.mutable_init_request()->set_producer_id("A");
req.mutable_init_request()->set_message_group_id("A");
@@ -838,7 +868,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.Clear();
auto* write = req.mutable_write_request();
- write->set_codec(1);
+ write->set_codec(Ydb::Topic::CODEC_RAW);
for (ui32 i = 0; i < 10; ++i) {
auto* msg = write->add_messages();
@@ -850,6 +880,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
+ UNIT_ASSERT(writeStream1->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kWriteResponse);
}
// init write session 2
@@ -857,7 +890,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;
- req.mutable_init_request()->set_path("acc/topic1");
+ req.mutable_init_request()->set_path("acc/topic3");
req.mutable_init_request()->set_producer_id("B");
req.mutable_init_request()->set_message_group_id("B");
@@ -872,7 +905,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
req.Clear();
auto* write = req.mutable_write_request();
- write->set_codec(1);
+ write->set_codec(Ydb::Topic::CODEC_CUSTOM + 42);
for (ui32 i = 0; i < 10; ++i) {
auto* msg = write->add_messages();
@@ -884,6 +917,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
if (!writeStream2->Write(req)) {
ythrow yexception() << "write fail";
}
+ UNIT_ASSERT(writeStream2->Read(&resp));
+ Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
+ UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamWriteMessage::FromServer::kWriteResponse);
}
// init 1st read session
@@ -891,7 +927,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::StreamReadMessage::FromClient req;
Ydb::Topic::StreamReadMessage::FromServer resp;
- req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic1");
+ req.mutable_init_request()->add_topics_read_settings()->set_path("acc/topic3");
req.mutable_init_request()->set_consumer("user");
@@ -922,7 +958,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
//lock partition
UNIT_ASSERT(readStream->Read(&resp));
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
- UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3");
partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id());
assignId = resp.start_partition_session_request().partition_session().partition_session_id();
@@ -935,7 +971,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
resp.Clear();
UNIT_ASSERT(readStream->Read(&resp));
UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest);
- UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic1");
+ UNIT_ASSERT(resp.start_partition_session_request().partition_session().path() == "acc/topic3");
partition_ids.push_back(resp.start_partition_session_request().partition_session().partition_id());
std::sort(partition_ids.begin(), partition_ids.end());