diff options
| -rw-r--r-- | ydb/core/client/flat_ut_client.h | 2 | ||||
| -rw-r--r-- | ydb/core/client/locks_ut.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/testlib/test_client.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/testlib/test_client.h | 10 | ||||
| -rw-r--r-- | ydb/core/testlib/test_pq_client.h | 249 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 28 |
6 files changed, 121 insertions, 184 deletions
diff --git a/ydb/core/client/flat_ut_client.h b/ydb/core/client/flat_ut_client.h index 07f5b30cc6b..52608875aea 100644 --- a/ydb/core/client/flat_ut_client.h +++ b/ydb/core/client/flat_ut_client.h @@ -55,7 +55,7 @@ public: TAutoPtr<NBus::TBusMessage> reply; NBus::EMessageStatus msgStatus = SendWhenReady(request, reply); UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); - Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Release()); } diff --git a/ydb/core/client/locks_ut.cpp b/ydb/core/client/locks_ut.cpp index 11b76611e30..2617461edaf 100644 --- a/ydb/core/client/locks_ut.cpp +++ b/ydb/core/client/locks_ut.cpp @@ -43,7 +43,7 @@ public: TAutoPtr<NBus::TBusMessage> reply; NBus::EMessageStatus msgStatus = SendWhenReady(request, reply); UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); - Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Release()); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index ced40a8b5f8..b9073e5c279 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1236,7 +1236,7 @@ namespace Tests { SendAndWaitCompletion(request, reply); #ifndef NDEBUG - Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; #endif return reply; } @@ -1353,7 +1353,7 @@ namespace Tests { TAutoPtr<NBus::TBusMessage> reply; NBus::EMessageStatus msgStatus = SendAndWaitCompletion(request, reply); #ifndef NDEBUG - Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; #endif UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record; @@ -1370,7 +1370,7 @@ namespace Tests { TAutoPtr<NBus::TBusMessage> reply; NBus::EMessageStatus msgStatus = SendAndWaitCompletion(request, reply); #ifndef NDEBUG - Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; #endif UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record; @@ -1836,7 +1836,7 @@ namespace Tests { TAutoPtr<NBus::TBusMessage> reply; auto msgStatus = WaitCompletion(descr.GetCreateTxId(), descr.GetSchemeshardId(), descr.GetPathId(), reply, timeout); #ifndef NDEBUG - Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; #endif UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record; @@ -1853,7 +1853,7 @@ namespace Tests { NBus::EMessageStatus msgStatus = SendWhenReady(request, reply); UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); - Cerr << "TClient::Ls response: " << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cerr << "TClient::Ls response: " << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Release()); } @@ -2008,7 +2008,7 @@ namespace Tests { NBus::EMessageStatus msgStatus = SendWhenReady(readRequest, reply); #ifndef NDEBUG - Cerr << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; + Cerr << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl; #endif UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record; @@ -2039,7 +2039,7 @@ namespace Tests { NBus::EMessageStatus msgStatus = SendWhenReady(deleteRequest, replyDelete); #ifndef NDEBUG - Cout << PrintResult<NMsgBusProxy::TBusResponse>(replyDelete.Get()) << Endl; + Cout << PrintToString<NMsgBusProxy::TBusResponse>(replyDelete.Get()) << Endl; #endif UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK); const NKikimrClient::TResponse &responseDelete = dynamic_cast<NMsgBusProxy::TBusResponse *>(replyDelete.Get())->Record; diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b535eb97d9e..4dc59109e90 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -95,6 +95,7 @@ namespace Tests { ui16 Port; ui16 GrpcPort = 0; + int GrpcMaxMessageSize = 0; // 0 - default (4_MB), -1 - no limit NKikimrProto::TAuthConfig AuthConfig; NKikimrPQ::TPQConfig PQConfig; NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig; @@ -141,6 +142,7 @@ namespace Tests { std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory; TServerSettings& SetGrpcPort(ui16 value) { GrpcPort = value; return *this; } + TServerSettings& SetGrpcMaxMessageSize(int value) { GrpcMaxMessageSize = value; return *this; } TServerSettings& SetSupportsRedirect(bool value) { SupportsRedirect = value; return *this; } TServerSettings& SetTracePath(const TString& value) { TracePath = value; return *this; } TServerSettings& SetDomain(ui32 value) { Domain = value; return *this; } @@ -486,7 +488,7 @@ namespace Tests { THolder<NKesus::TEvKesus::TEvGetConfigResult> GetKesusConfig(TTestActorRuntime* runtime, const TString& kesusPath); protected: - TString PrintResult(const ::google::protobuf::Message& msg, size_t maxSz = 1000) { + TString PrintToString(const ::google::protobuf::Message& msg, size_t maxSz = 1000) { TString s; ::google::protobuf::TextFormat::PrintToString(msg, &s); if (s.size() > maxSz) { @@ -497,9 +499,9 @@ namespace Tests { } template <class TMsg> - TString PrintResult(NBus::TBusMessage* msg, size_t maxSz = 1000) { - auto res = dynamic_cast<TMsg*>(msg); - return PrintResult(res->Record, maxSz); + TString PrintToString(const NBus::TBusMessage* msg, size_t maxSz = 1000) { + auto res = dynamic_cast<const TMsg*>(msg); + return PrintToString(res->Record, maxSz); } // Waits for kikimr server to become ready diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index b8b1975d11b..c742c8aedea 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -459,11 +459,6 @@ struct TRequestDescribePQ { } }; -enum class ETransport { - MsgBus, - GRpc -}; - struct TPQTestClusterInfo { TString Balancer; bool Enabled; @@ -487,9 +482,10 @@ private: const ui16 GRpcPort; NClient::TKikimr Kikimr; THolder<NYdb::TDriver> Driver; + std::unique_ptr<NKikimrClient::TGRpcServer::Stub> Stub; + ui64 TopicsVersion = 0; bool UseConfigTables = true; - public: void RunYqlSchemeQuery(TString query, bool expectSuccess = true) { auto tableClient = NYdb::NTable::TTableClient(*Driver); @@ -527,21 +523,29 @@ public: } - TFlatMsgBusPQClient( - const Tests::TServerSettings& settings, ui16 grpc, TMaybe<TString> databaseName = Nothing() - ) + TFlatMsgBusPQClient(const Tests::TServerSettings& settings, ui16 grpc, TMaybe<TString> databaseName = Nothing()) : TFlatMsgBusClient(settings) , Settings(settings) , GRpcPort(grpc) , Kikimr(GetClientConfig()) { + TString endpoint = TStringBuilder() << "localhost:" << GRpcPort; auto driverConfig = NYdb::TDriverConfig() - .SetEndpoint(TStringBuilder() << "localhost:" << GRpcPort) - .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); - if (databaseName) { + .SetEndpoint(endpoint) + .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + if (databaseName) driverConfig.SetDatabase(*databaseName); - } Driver.Reset(MakeHolder<NYdb::TDriver>(driverConfig)); + + grpc::ChannelArguments args; + if (settings.GrpcMaxMessageSize != 0) + { + args.SetMaxReceiveMessageSize(settings.GrpcMaxMessageSize); + args.SetMaxSendMessageSize(settings.GrpcMaxMessageSize); + } + auto channel = grpc::CreateCustomChannel(endpoint, grpc::InsecureChannelCredentials(), args); + + Stub = NKikimrClient::TGRpcServer::NewStub(channel); } ~TFlatMsgBusPQClient() { @@ -735,20 +739,7 @@ public: NKikimrClient::TPersQueueRequest request; request.MutableMetaRequest()->MutableCmdGetTopicMetadata()->AddTopic(name); - Cerr << "RequestTopicMetadata request: " << name << " to server " << Client->GetConfig().Ip << ":" << Client->GetConfig().Port << Endl; - - NKikimrClient::TResponse response; - auto channel = grpc::CreateChannel("localhost:" + ToString(GRpcPort), grpc::InsecureChannelCredentials()); - auto stub(NKikimrClient::TGRpcServer::NewStub(channel)); - grpc::ClientContext context; - auto status = stub->PersQueueRequest(&context, request, &response); - - Cerr << "RequestTopicMetadata response: " << PrintResult(response) << Endl; - - UNIT_ASSERT(status.ok()); - UNIT_ASSERT(response.HasErrorCode()); - - return response; + return CallPersQueueGRPC(request); } ui32 GetTopicVersionFromMetadata(const TString& name, ui64 cacheSize = 0) @@ -819,19 +810,20 @@ public: return response.GetErrorCode() == (ui32)NPersQueue::NErrorCode::UNKNOWN_TOPIC; } + NKikimrClient::TResponse CallPersQueueGRPC(const NKikimrClient::TPersQueueRequest& request, ui64 maxPrintSize = 1000) { + Cerr << "CallPersQueueGRPC request to " << Client->GetConfig().Ip << ":" << Client->GetConfig().Port << "\n" + << PrintToString(request, maxPrintSize) << Endl; - const NMsgBusProxy::TBusResponse* SendAndGetReply(TAutoPtr<NMsgBusProxy::TBusPersQueue> request, - TAutoPtr<NBus::TBusMessage>& reply, ui64 maxPrintSize = 0) { - NBus::EMessageStatus status = SyncCall(request, reply); - TString msgStr; - UNIT_ASSERT_VALUES_EQUAL(status, NBus::MESSAGE_OK); - if (maxPrintSize) { - msgStr = PrintResult<NMsgBusProxy::TBusResponse>(reply.Get(), maxPrintSize); - } else { - msgStr = PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()); - } - Cerr << msgStr << Endl; - return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Get()); + NKikimrClient::TResponse response; + grpc::ClientContext context; + grpc::Status status = Stub->PersQueueRequest(&context, request, &response); + + UNIT_ASSERT_C(status.ok(), status.error_message()); + UNIT_ASSERT(response.HasErrorCode()); + + Cerr << "CallPersQueueGRPC response:\n" << PrintToString(response, maxPrintSize) << Endl; + + return response; } void CreateConsumer(const TString& oldName) { @@ -902,18 +894,12 @@ public: } while (true); } - void CreateTopic(const TRequestCreatePQ& createRequest, bool doWait = true - ) { + void CreateTopic(const TRequestCreatePQ& createRequest, bool doWait = true) { const TInstant start = TInstant::Now(); - THolder<NMsgBusProxy::TBusPersQueue> request = createRequest.GetRequest(); - ui32 prevVersion = GetTopicVersionFromMetadata(createRequest.Topic); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply); - UNIT_ASSERT(response); - UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, - TStringBuilder() << "proxy failure: " << response->Record.DebugString()); + + CallPersQueueGRPC(createRequest.GetRequest()->Record); AddTopic(createRequest.Topic); while (doWait && GetTopicVersionFromPath(createRequest.Topic) != prevVersion + 1) { @@ -976,15 +962,11 @@ public: ) { Y_VERIFY(name.StartsWith("rt3.")); TRequestAlterPQ requestDescr(name, nParts, cacheSize, lifetimeS, fillPartitionConfig, mirrorFrom); - THolder<NMsgBusProxy::TBusPersQueue> request = requestDescr.GetRequest(); + THolder<NMsgBusProxy::TBusPersQueue> alterRequest = requestDescr.GetRequest(); ui32 prevVersion = GetTopicVersionFromMetadata(name); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, - response->Record.DebugString().c_str()); + CallPersQueueGRPC(alterRequest->Record); const TInstant start = TInstant::Now(); AlterTopic(); @@ -1013,13 +995,9 @@ public: ) { Y_VERIFY(name.StartsWith("rt3.")); - THolder<NMsgBusProxy::TBusPersQueue> request = TRequestDeletePQ{name}.GetRequest(); + THolder<NMsgBusProxy::TBusPersQueue> deleteRequest = TRequestDeletePQ{name}.GetRequest(); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)expectedStatus, - "proxy failure"); + CallPersQueueGRPC(deleteRequest->Record); // wait for drop completion if (expectedStatus == NPersQueue::NErrorCode::OK) { @@ -1042,52 +1020,37 @@ public: } } - TString GetOwnership(const TRequestGetOwnership& getOwnership, - NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK) { - - THolder<NMsgBusProxy::TBusPersQueue> request = getOwnership.GetRequest(); - - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), expectedStatus, - "proxy failure"); + TString GetOwnership(const TRequestGetOwnership& getOwnership, NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK) { + auto response = CallPersQueueGRPC(getOwnership.GetRequest()->Record); if (expectedStatus == NMsgBusProxy::MSTATUS_OK) { - UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, - "write failure"); - return response->Record.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + UNIT_ASSERT_VALUES_EQUAL_C((ui32)response.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, "write failure"); + return response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); } return ""; } - void ChooseProxy(ETransport transport = ETransport::MsgBus) { - THolder<NMsgBusProxy::TBusChooseProxy> request = MakeHolder<NMsgBusProxy::TBusChooseProxy>(); + void ChooseProxy() { + NKikimrClient::TChooseProxyRequest request; NKikimrClient::TResponse response; - if (transport == ETransport::GRpc) { - auto channel = grpc::CreateChannel("localhost:"+ToString(GRpcPort), grpc::InsecureChannelCredentials()); - auto stub(NKikimrClient::TGRpcServer::NewStub(channel)); - grpc::ClientContext context; - auto status = stub->ChooseProxy(&context, request->Record, &response); + Cerr << "ChooseProxy request to server " << Client->GetConfig().Ip << ":" << Client->GetConfig().Port << "\n" + << PrintToString(request) << Endl; - UNIT_ASSERT(status.ok()); - } else { - Y_FAIL("not allowed"); - } + grpc::ClientContext context; + auto status = Stub->ChooseProxy(&context, request, &response); - Cerr << response << "\n"; + Cerr << "ChooseProxy response:\n" << PrintToString(response) << Endl; - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK, - "proxy failure"); + UNIT_ASSERT_C(status.ok(), status.error_message()); + + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK, "proxy failure"); } void WriteToPQ( const TRequestWritePQ& writeRequest, const TString& data, const TString& ticket = "", - ETransport transport = ETransport::MsgBus, NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK, NMsgBusProxy::EResponseStatus expectedOwnerStatus = NMsgBusProxy::MSTATUS_OK ) { @@ -1095,27 +1058,11 @@ public: TString cookie = GetOwnership({writeRequest.Topic, writeRequest.Partition}, expectedOwnerStatus); THolder<NMsgBusProxy::TBusPersQueue> request = writeRequest.GetRequest(data, cookie); - if (!ticket.empty()) { + if (!ticket.empty()) request.Get()->Record.SetTicket(ticket); - } - NKikimrClient::TResponse response; - - if (transport == ETransport::GRpc) { - auto channel = grpc::CreateChannel("localhost:"+ToString(GRpcPort), grpc::InsecureChannelCredentials()); - auto stub(NKikimrClient::TGRpcServer::NewStub(channel)); - grpc::ClientContext context; - auto status = stub->PersQueueRequest(&context, request->Record, &response); - UNIT_ASSERT(status.ok()); - } else { - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* busResponse = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(busResponse); - - response.CopyFrom(busResponse->Record); - } + auto response = CallPersQueueGRPC(request->Record); - Cerr << response << "\n"; UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), expectedStatus, "proxy failure"); if (expectedStatus == NMsgBusProxy::MSTATUS_OK) { @@ -1126,10 +1073,9 @@ public: void WriteToPQ(const TString& topic, ui32 partition, const TString& sourceId, const ui64 seqNo, const TString& data, const TString& ticket = "", - ETransport transport = ETransport::MsgBus, NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK, NMsgBusProxy::EResponseStatus expectedOwnerStatus = NMsgBusProxy::MSTATUS_OK) { - WriteToPQ({topic, partition, sourceId, seqNo}, data, ticket, transport, expectedStatus, expectedOwnerStatus); + WriteToPQ({topic, partition, sourceId, seqNo}, data, ticket, expectedStatus, expectedOwnerStatus); } struct TReadDebugInfo { @@ -1149,22 +1095,20 @@ public: request.Get()->Record.SetTicket(ticket); } - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); + auto response = CallPersQueueGRPC(request->Record); - auto status = response->Record.GetStatus(); - auto errorCode = response->Record.GetErrorCode(); - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response->Record.GetErrorReason()); - UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response->Record.GetErrorReason()); + auto status = response.GetStatus(); + auto errorCode = response.GetErrorCode(); + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response.GetErrorReason()); + UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response.GetErrorReason()); if (expectedStatus == NMsgBusProxy::MSTATUS_OK) { - UNIT_ASSERT(response->Record.GetPartitionResponse().HasCmdReadResult()); - if (readCount > 0) UNIT_ASSERT_VALUES_EQUAL(response->Record.GetPartitionResponse().GetCmdReadResult().ResultSize(), readCount); + UNIT_ASSERT(response.GetPartitionResponse().HasCmdReadResult()); + if (readCount > 0) UNIT_ASSERT_VALUES_EQUAL(response.GetPartitionResponse().GetCmdReadResult().ResultSize(), readCount); } TReadDebugInfo info; - auto result = response->Record.GetPartitionResponse().GetCmdReadResult(); + auto result = response.GetPartitionResponse().GetCmdReadResult(); if (result.HasBlobsFromDisk()) info.BlobsFromDisk = result.GetBlobsFromDisk(); if (result.HasBlobsFromCache()) @@ -1193,14 +1137,12 @@ public: request.Get()->Record.SetTicket(ticket); } - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); + auto response = CallPersQueueGRPC(request->Record); - auto status = response->Record.GetStatus(); - auto errorCode = response->Record.GetErrorCode(); - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response->Record.GetErrorReason()); - UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response->Record.GetErrorReason()); + auto status = response.GetStatus(); + auto errorCode = response.GetErrorCode(); + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response.GetErrorReason()); + UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response.GetErrorReason()); } void SetClientOffsetPQ(const TString& topic, ui32 partition, ui64 offset, const TString& ticket = "", @@ -1211,24 +1153,21 @@ public: void FetchRequestPQ(const TVector<FetchPartInfo>& fetchParts, ui32 maxBytes, ui32 waitMs) { THolder<NMsgBusProxy::TBusPersQueue> request = TFetchRequestPQ().GetRequest(fetchParts, maxBytes, waitMs); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); + CallPersQueueGRPC(request->Record); } void GetPartOffset(const TVector<std::pair<TString, TVector<ui32>>>& topicsAndParts, ui32 resCount, ui32 hasClientOffset, bool ok) { THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetPartOffsets().GetRequest(topicsAndParts); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, + auto response = CallPersQueueGRPC(request->Record); + + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, "proxy failure"); if (!ok) return; - auto res = response->Record.GetMetaResponse().GetCmdGetPartitionOffsetsResult(); + auto res = response.GetMetaResponse().GetCmdGetPartitionOffsetsResult(); ui32 count = 0; ui32 clientOffsetCount = 0; for (ui32 i = 0; i < res.TopicResultSize(); ++i) { @@ -1247,11 +1186,9 @@ public: THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetClientInfo().GetRequest(topics, user); Cerr << "Request: " << request->Record << Endl; - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - Cerr << "Response: " << response->Record << "\n"; - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, + auto response = CallPersQueueGRPC(request->Record); + + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, "proxy failure"); THashSet<TString> good; THashSet<TString> bad; @@ -1263,7 +1200,7 @@ public: good.insert(t); } } - for (auto& tt : response->Record.GetMetaResponse().GetCmdGetReadSessionsInfoResult().GetTopicResult()) { + for (auto& tt : response.GetMetaResponse().GetCmdGetReadSessionsInfoResult().GetTopicResult()) { const auto& topic = tt.GetTopic(); if (bad.contains(topic)) { UNIT_ASSERT(tt.GetErrorCode() != (ui32)NPersQueue::NErrorCode::OK); @@ -1271,22 +1208,21 @@ public: UNIT_ASSERT(tt.GetErrorCode() == (ui32)NPersQueue::NErrorCode::OK); } } - return response->Record; + return response; } void GetPartStatus(const TVector<std::pair<TString, TVector<ui32>>>& topicsAndParts, ui32 resCount, bool ok) { THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetPartStatus().GetRequest(topicsAndParts); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, + auto response = CallPersQueueGRPC(request->Record); + + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, "proxy failure"); if (!ok) return; - auto res = response->Record.GetMetaResponse().GetCmdGetPartitionStatusResult(); + auto res = response.GetMetaResponse().GetCmdGetPartitionStatusResult(); ui32 count = 0; for (ui32 i = 0; i < res.TopicResultSize(); ++i) { auto t = res.GetTopicResult(i); @@ -1306,20 +1242,19 @@ public: THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetPartLocations().GetRequest(topicsAndParts); - TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - if (response->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { + auto response = CallPersQueueGRPC(request->Record); + + if (response.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { doRetry = true; continue; } - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR, "proxy failure"); if (!ok) return {}; - auto res = response->Record.GetMetaResponse().GetCmdGetPartitionLocationsResult(); + auto res = response.GetMetaResponse().GetCmdGetPartitionLocationsResult(); for (ui32 i = 0; i < res.TopicResultSize(); ++i) { auto t = res.GetTopicResult(i); @@ -1347,17 +1282,17 @@ public: THolder<NMsgBusProxy::TBusPersQueue> request = TRequestDescribePQ().GetRequest(topics); TAutoPtr<NBus::TBusMessage> reply; - const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply); - UNIT_ASSERT(response); - if ((NMsgBusProxy::EResponseStatus)response->Record.GetStatus() != NMsgBusProxy::MSTATUS_OK) { + auto response = CallPersQueueGRPC(request->Record); + + if ((NMsgBusProxy::EResponseStatus)response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { UNIT_ASSERT(error); return {}; } - UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK, + UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK, "proxy failure"); - auto res = response->Record.GetMetaResponse().GetCmdGetTopicMetadataResult(); + auto res = response.GetMetaResponse().GetCmdGetTopicMetadataResult(); UNIT_ASSERT(topics.size() <= res.TopicInfoSize()); for (ui32 i = 0; i < res.TopicInfoSize(); ++i) { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 517c6f9a9f8..90fa6c3d029 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -1865,8 +1865,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(status.ok()); } - server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 1, "valuevaluevalue1", "", ETransport::GRpc); - server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 2, "valuevaluevalue1", "", ETransport::GRpc); + server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 1, "valuevaluevalue1", ""); + server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 2, "valuevaluevalue1", ""); } Y_UNIT_TEST(WriteExistingBigValue) { @@ -1890,10 +1890,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.EnableLogs({ NKikimrServices::PERSQUEUE }); // empty data and sourceId - server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR); - server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR); - server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "a", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR); - server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "a", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_OK); + server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "", "", NMsgBusProxy::MSTATUS_ERROR); + server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "", "", NMsgBusProxy::MSTATUS_ERROR); + server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "a", "", NMsgBusProxy::MSTATUS_ERROR); + server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "a", "", NMsgBusProxy::MSTATUS_OK); } @@ -1906,7 +1906,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->WriteToPQ( DEFAULT_TOPIC_NAME, 100500, "abacaba", 1, "valuevaluevalue1", "", - ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR + NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR ); } @@ -1917,7 +1917,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->WriteToPQ( DEFAULT_TOPIC_NAME + "000", 1, "abacaba", 1, "valuevaluevalue1", "", - ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR + NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR ); } @@ -1946,7 +1946,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->WriteToPQ( DEFAULT_TOPIC_NAME, 5, "abacaba", 1, "valuevaluevalue1", "", - ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR + NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR ); server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -1958,7 +1958,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 5, "abacaba", 1, "valuevaluevalue1"); server.AnnoyingClient->WriteToPQ( DEFAULT_TOPIC_NAME, 15, "abacaba", 1, "valuevaluevalue1", "", - ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR + NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR ); server.AnnoyingClient->AlterTopic(DEFAULT_TOPIC_NAME, 20); @@ -1990,7 +1990,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Y_UNIT_TEST(BigRead) { - NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root")); + NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(24_MB)); server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400, 20000000, "user", 2000000); server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE }); @@ -1999,7 +1999,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { for (ui32 i = 0; i < 32; ++i) server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value); - // trying to read small PQ messages in a big messagebus event + // trying to read small PQ messages in a big gRPC event auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0); UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4); @@ -2016,7 +2016,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { // expects that L2 size is 32Mb Y_UNIT_TEST(Cache) { - NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root")); + NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(18_MB)); server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400); server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE }); @@ -2047,7 +2047,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } Y_UNIT_TEST(CacheHead) { - NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root")); + NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(16_MB)); server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 6_MB, 86400); server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE }); |
