summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/client/flat_ut_client.h2
-rw-r--r--ydb/core/client/locks_ut.cpp2
-rw-r--r--ydb/core/testlib/test_client.cpp14
-rw-r--r--ydb/core/testlib/test_client.h10
-rw-r--r--ydb/core/testlib/test_pq_client.h249
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp28
6 files changed, 121 insertions, 184 deletions
diff --git a/ydb/core/client/flat_ut_client.h b/ydb/core/client/flat_ut_client.h
index 07f5b30cc6b..52608875aea 100644
--- a/ydb/core/client/flat_ut_client.h
+++ b/ydb/core/client/flat_ut_client.h
@@ -55,7 +55,7 @@ public:
TAutoPtr<NBus::TBusMessage> reply;
NBus::EMessageStatus msgStatus = SendWhenReady(request, reply);
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Release());
}
diff --git a/ydb/core/client/locks_ut.cpp b/ydb/core/client/locks_ut.cpp
index 11b76611e30..2617461edaf 100644
--- a/ydb/core/client/locks_ut.cpp
+++ b/ydb/core/client/locks_ut.cpp
@@ -43,7 +43,7 @@ public:
TAutoPtr<NBus::TBusMessage> reply;
NBus::EMessageStatus msgStatus = SendWhenReady(request, reply);
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Release());
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index ced40a8b5f8..b9073e5c279 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -1236,7 +1236,7 @@ namespace Tests {
SendAndWaitCompletion(request, reply);
#ifndef NDEBUG
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
#endif
return reply;
}
@@ -1353,7 +1353,7 @@ namespace Tests {
TAutoPtr<NBus::TBusMessage> reply;
NBus::EMessageStatus msgStatus = SendAndWaitCompletion(request, reply);
#ifndef NDEBUG
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
#endif
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record;
@@ -1370,7 +1370,7 @@ namespace Tests {
TAutoPtr<NBus::TBusMessage> reply;
NBus::EMessageStatus msgStatus = SendAndWaitCompletion(request, reply);
#ifndef NDEBUG
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
#endif
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record;
@@ -1836,7 +1836,7 @@ namespace Tests {
TAutoPtr<NBus::TBusMessage> reply;
auto msgStatus = WaitCompletion(descr.GetCreateTxId(), descr.GetSchemeshardId(), descr.GetPathId(), reply, timeout);
#ifndef NDEBUG
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
#endif
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record;
@@ -1853,7 +1853,7 @@ namespace Tests {
NBus::EMessageStatus msgStatus = SendWhenReady(request, reply);
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
- Cerr << "TClient::Ls response: " << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cerr << "TClient::Ls response: " << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Release());
}
@@ -2008,7 +2008,7 @@ namespace Tests {
NBus::EMessageStatus msgStatus = SendWhenReady(readRequest, reply);
#ifndef NDEBUG
- Cerr << PrintResult<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
+ Cerr << PrintToString<NMsgBusProxy::TBusResponse>(reply.Get()) << Endl;
#endif
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
const NKikimrClient::TResponse &response = dynamic_cast<NMsgBusProxy::TBusResponse *>(reply.Get())->Record;
@@ -2039,7 +2039,7 @@ namespace Tests {
NBus::EMessageStatus msgStatus = SendWhenReady(deleteRequest, replyDelete);
#ifndef NDEBUG
- Cout << PrintResult<NMsgBusProxy::TBusResponse>(replyDelete.Get()) << Endl;
+ Cout << PrintToString<NMsgBusProxy::TBusResponse>(replyDelete.Get()) << Endl;
#endif
UNIT_ASSERT_VALUES_EQUAL(msgStatus, NBus::MESSAGE_OK);
const NKikimrClient::TResponse &responseDelete = dynamic_cast<NMsgBusProxy::TBusResponse *>(replyDelete.Get())->Record;
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index b535eb97d9e..4dc59109e90 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -95,6 +95,7 @@ namespace Tests {
ui16 Port;
ui16 GrpcPort = 0;
+ int GrpcMaxMessageSize = 0; // 0 - default (4_MB), -1 - no limit
NKikimrProto::TAuthConfig AuthConfig;
NKikimrPQ::TPQConfig PQConfig;
NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig;
@@ -141,6 +142,7 @@ namespace Tests {
std::shared_ptr<TGrpcServiceFactory> GrpcServiceFactory;
TServerSettings& SetGrpcPort(ui16 value) { GrpcPort = value; return *this; }
+ TServerSettings& SetGrpcMaxMessageSize(int value) { GrpcMaxMessageSize = value; return *this; }
TServerSettings& SetSupportsRedirect(bool value) { SupportsRedirect = value; return *this; }
TServerSettings& SetTracePath(const TString& value) { TracePath = value; return *this; }
TServerSettings& SetDomain(ui32 value) { Domain = value; return *this; }
@@ -486,7 +488,7 @@ namespace Tests {
THolder<NKesus::TEvKesus::TEvGetConfigResult> GetKesusConfig(TTestActorRuntime* runtime, const TString& kesusPath);
protected:
- TString PrintResult(const ::google::protobuf::Message& msg, size_t maxSz = 1000) {
+ TString PrintToString(const ::google::protobuf::Message& msg, size_t maxSz = 1000) {
TString s;
::google::protobuf::TextFormat::PrintToString(msg, &s);
if (s.size() > maxSz) {
@@ -497,9 +499,9 @@ namespace Tests {
}
template <class TMsg>
- TString PrintResult(NBus::TBusMessage* msg, size_t maxSz = 1000) {
- auto res = dynamic_cast<TMsg*>(msg);
- return PrintResult(res->Record, maxSz);
+ TString PrintToString(const NBus::TBusMessage* msg, size_t maxSz = 1000) {
+ auto res = dynamic_cast<const TMsg*>(msg);
+ return PrintToString(res->Record, maxSz);
}
// Waits for kikimr server to become ready
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index b8b1975d11b..c742c8aedea 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -459,11 +459,6 @@ struct TRequestDescribePQ {
}
};
-enum class ETransport {
- MsgBus,
- GRpc
-};
-
struct TPQTestClusterInfo {
TString Balancer;
bool Enabled;
@@ -487,9 +482,10 @@ private:
const ui16 GRpcPort;
NClient::TKikimr Kikimr;
THolder<NYdb::TDriver> Driver;
+ std::unique_ptr<NKikimrClient::TGRpcServer::Stub> Stub;
+
ui64 TopicsVersion = 0;
bool UseConfigTables = true;
-
public:
void RunYqlSchemeQuery(TString query, bool expectSuccess = true) {
auto tableClient = NYdb::NTable::TTableClient(*Driver);
@@ -527,21 +523,29 @@ public:
}
- TFlatMsgBusPQClient(
- const Tests::TServerSettings& settings, ui16 grpc, TMaybe<TString> databaseName = Nothing()
- )
+ TFlatMsgBusPQClient(const Tests::TServerSettings& settings, ui16 grpc, TMaybe<TString> databaseName = Nothing())
: TFlatMsgBusClient(settings)
, Settings(settings)
, GRpcPort(grpc)
, Kikimr(GetClientConfig())
{
+ TString endpoint = TStringBuilder() << "localhost:" << GRpcPort;
auto driverConfig = NYdb::TDriverConfig()
- .SetEndpoint(TStringBuilder() << "localhost:" << GRpcPort)
- .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
- if (databaseName) {
+ .SetEndpoint(endpoint)
+ .SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ if (databaseName)
driverConfig.SetDatabase(*databaseName);
- }
Driver.Reset(MakeHolder<NYdb::TDriver>(driverConfig));
+
+ grpc::ChannelArguments args;
+ if (settings.GrpcMaxMessageSize != 0)
+ {
+ args.SetMaxReceiveMessageSize(settings.GrpcMaxMessageSize);
+ args.SetMaxSendMessageSize(settings.GrpcMaxMessageSize);
+ }
+ auto channel = grpc::CreateCustomChannel(endpoint, grpc::InsecureChannelCredentials(), args);
+
+ Stub = NKikimrClient::TGRpcServer::NewStub(channel);
}
~TFlatMsgBusPQClient() {
@@ -735,20 +739,7 @@ public:
NKikimrClient::TPersQueueRequest request;
request.MutableMetaRequest()->MutableCmdGetTopicMetadata()->AddTopic(name);
- Cerr << "RequestTopicMetadata request: " << name << " to server " << Client->GetConfig().Ip << ":" << Client->GetConfig().Port << Endl;
-
- NKikimrClient::TResponse response;
- auto channel = grpc::CreateChannel("localhost:" + ToString(GRpcPort), grpc::InsecureChannelCredentials());
- auto stub(NKikimrClient::TGRpcServer::NewStub(channel));
- grpc::ClientContext context;
- auto status = stub->PersQueueRequest(&context, request, &response);
-
- Cerr << "RequestTopicMetadata response: " << PrintResult(response) << Endl;
-
- UNIT_ASSERT(status.ok());
- UNIT_ASSERT(response.HasErrorCode());
-
- return response;
+ return CallPersQueueGRPC(request);
}
ui32 GetTopicVersionFromMetadata(const TString& name, ui64 cacheSize = 0)
@@ -819,19 +810,20 @@ public:
return response.GetErrorCode() == (ui32)NPersQueue::NErrorCode::UNKNOWN_TOPIC;
}
+ NKikimrClient::TResponse CallPersQueueGRPC(const NKikimrClient::TPersQueueRequest& request, ui64 maxPrintSize = 1000) {
+ Cerr << "CallPersQueueGRPC request to " << Client->GetConfig().Ip << ":" << Client->GetConfig().Port << "\n"
+ << PrintToString(request, maxPrintSize) << Endl;
- const NMsgBusProxy::TBusResponse* SendAndGetReply(TAutoPtr<NMsgBusProxy::TBusPersQueue> request,
- TAutoPtr<NBus::TBusMessage>& reply, ui64 maxPrintSize = 0) {
- NBus::EMessageStatus status = SyncCall(request, reply);
- TString msgStr;
- UNIT_ASSERT_VALUES_EQUAL(status, NBus::MESSAGE_OK);
- if (maxPrintSize) {
- msgStr = PrintResult<NMsgBusProxy::TBusResponse>(reply.Get(), maxPrintSize);
- } else {
- msgStr = PrintResult<NMsgBusProxy::TBusResponse>(reply.Get());
- }
- Cerr << msgStr << Endl;
- return dynamic_cast<NMsgBusProxy::TBusResponse*>(reply.Get());
+ NKikimrClient::TResponse response;
+ grpc::ClientContext context;
+ grpc::Status status = Stub->PersQueueRequest(&context, request, &response);
+
+ UNIT_ASSERT_C(status.ok(), status.error_message());
+ UNIT_ASSERT(response.HasErrorCode());
+
+ Cerr << "CallPersQueueGRPC response:\n" << PrintToString(response, maxPrintSize) << Endl;
+
+ return response;
}
void CreateConsumer(const TString& oldName) {
@@ -902,18 +894,12 @@ public:
} while (true);
}
- void CreateTopic(const TRequestCreatePQ& createRequest, bool doWait = true
- ) {
+ void CreateTopic(const TRequestCreatePQ& createRequest, bool doWait = true) {
const TInstant start = TInstant::Now();
- THolder<NMsgBusProxy::TBusPersQueue> request = createRequest.GetRequest();
-
ui32 prevVersion = GetTopicVersionFromMetadata(createRequest.Topic);
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply);
- UNIT_ASSERT(response);
- UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK,
- TStringBuilder() << "proxy failure: " << response->Record.DebugString());
+
+ CallPersQueueGRPC(createRequest.GetRequest()->Record);
AddTopic(createRequest.Topic);
while (doWait && GetTopicVersionFromPath(createRequest.Topic) != prevVersion + 1) {
@@ -976,15 +962,11 @@ public:
) {
Y_VERIFY(name.StartsWith("rt3."));
TRequestAlterPQ requestDescr(name, nParts, cacheSize, lifetimeS, fillPartitionConfig, mirrorFrom);
- THolder<NMsgBusProxy::TBusPersQueue> request = requestDescr.GetRequest();
+ THolder<NMsgBusProxy::TBusPersQueue> alterRequest = requestDescr.GetRequest();
ui32 prevVersion = GetTopicVersionFromMetadata(name);
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK,
- response->Record.DebugString().c_str());
+ CallPersQueueGRPC(alterRequest->Record);
const TInstant start = TInstant::Now();
AlterTopic();
@@ -1013,13 +995,9 @@ public:
) {
Y_VERIFY(name.StartsWith("rt3."));
- THolder<NMsgBusProxy::TBusPersQueue> request = TRequestDeletePQ{name}.GetRequest();
+ THolder<NMsgBusProxy::TBusPersQueue> deleteRequest = TRequestDeletePQ{name}.GetRequest();
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)expectedStatus,
- "proxy failure");
+ CallPersQueueGRPC(deleteRequest->Record);
// wait for drop completion
if (expectedStatus == NPersQueue::NErrorCode::OK) {
@@ -1042,52 +1020,37 @@ public:
}
}
- TString GetOwnership(const TRequestGetOwnership& getOwnership,
- NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK) {
-
- THolder<NMsgBusProxy::TBusPersQueue> request = getOwnership.GetRequest();
-
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
-
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), expectedStatus,
- "proxy failure");
+ TString GetOwnership(const TRequestGetOwnership& getOwnership, NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK) {
+ auto response = CallPersQueueGRPC(getOwnership.GetRequest()->Record);
if (expectedStatus == NMsgBusProxy::MSTATUS_OK) {
- UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK,
- "write failure");
- return response->Record.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
+ UNIT_ASSERT_VALUES_EQUAL_C((ui32)response.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, "write failure");
+ return response.GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
}
return "";
}
- void ChooseProxy(ETransport transport = ETransport::MsgBus) {
- THolder<NMsgBusProxy::TBusChooseProxy> request = MakeHolder<NMsgBusProxy::TBusChooseProxy>();
+ void ChooseProxy() {
+ NKikimrClient::TChooseProxyRequest request;
NKikimrClient::TResponse response;
- if (transport == ETransport::GRpc) {
- auto channel = grpc::CreateChannel("localhost:"+ToString(GRpcPort), grpc::InsecureChannelCredentials());
- auto stub(NKikimrClient::TGRpcServer::NewStub(channel));
- grpc::ClientContext context;
- auto status = stub->ChooseProxy(&context, request->Record, &response);
+ Cerr << "ChooseProxy request to server " << Client->GetConfig().Ip << ":" << Client->GetConfig().Port << "\n"
+ << PrintToString(request) << Endl;
- UNIT_ASSERT(status.ok());
- } else {
- Y_FAIL("not allowed");
- }
+ grpc::ClientContext context;
+ auto status = Stub->ChooseProxy(&context, request, &response);
- Cerr << response << "\n";
+ Cerr << "ChooseProxy response:\n" << PrintToString(response) << Endl;
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK,
- "proxy failure");
+ UNIT_ASSERT_C(status.ok(), status.error_message());
+
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK, "proxy failure");
}
void WriteToPQ(
const TRequestWritePQ& writeRequest, const TString& data,
const TString& ticket = "",
- ETransport transport = ETransport::MsgBus,
NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK,
NMsgBusProxy::EResponseStatus expectedOwnerStatus = NMsgBusProxy::MSTATUS_OK
) {
@@ -1095,27 +1058,11 @@ public:
TString cookie = GetOwnership({writeRequest.Topic, writeRequest.Partition}, expectedOwnerStatus);
THolder<NMsgBusProxy::TBusPersQueue> request = writeRequest.GetRequest(data, cookie);
- if (!ticket.empty()) {
+ if (!ticket.empty())
request.Get()->Record.SetTicket(ticket);
- }
- NKikimrClient::TResponse response;
-
- if (transport == ETransport::GRpc) {
- auto channel = grpc::CreateChannel("localhost:"+ToString(GRpcPort), grpc::InsecureChannelCredentials());
- auto stub(NKikimrClient::TGRpcServer::NewStub(channel));
- grpc::ClientContext context;
- auto status = stub->PersQueueRequest(&context, request->Record, &response);
- UNIT_ASSERT(status.ok());
- } else {
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* busResponse = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(busResponse);
-
- response.CopyFrom(busResponse->Record);
- }
+ auto response = CallPersQueueGRPC(request->Record);
- Cerr << response << "\n";
UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), expectedStatus,
"proxy failure");
if (expectedStatus == NMsgBusProxy::MSTATUS_OK) {
@@ -1126,10 +1073,9 @@ public:
void WriteToPQ(const TString& topic, ui32 partition, const TString& sourceId, const ui64 seqNo, const TString& data,
const TString& ticket = "",
- ETransport transport = ETransport::MsgBus,
NMsgBusProxy::EResponseStatus expectedStatus = NMsgBusProxy::MSTATUS_OK,
NMsgBusProxy::EResponseStatus expectedOwnerStatus = NMsgBusProxy::MSTATUS_OK) {
- WriteToPQ({topic, partition, sourceId, seqNo}, data, ticket, transport, expectedStatus, expectedOwnerStatus);
+ WriteToPQ({topic, partition, sourceId, seqNo}, data, ticket, expectedStatus, expectedOwnerStatus);
}
struct TReadDebugInfo {
@@ -1149,22 +1095,20 @@ public:
request.Get()->Record.SetTicket(ticket);
}
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
+ auto response = CallPersQueueGRPC(request->Record);
- auto status = response->Record.GetStatus();
- auto errorCode = response->Record.GetErrorCode();
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response->Record.GetErrorReason());
- UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response->Record.GetErrorReason());
+ auto status = response.GetStatus();
+ auto errorCode = response.GetErrorCode();
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response.GetErrorReason());
+ UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response.GetErrorReason());
if (expectedStatus == NMsgBusProxy::MSTATUS_OK) {
- UNIT_ASSERT(response->Record.GetPartitionResponse().HasCmdReadResult());
- if (readCount > 0) UNIT_ASSERT_VALUES_EQUAL(response->Record.GetPartitionResponse().GetCmdReadResult().ResultSize(), readCount);
+ UNIT_ASSERT(response.GetPartitionResponse().HasCmdReadResult());
+ if (readCount > 0) UNIT_ASSERT_VALUES_EQUAL(response.GetPartitionResponse().GetCmdReadResult().ResultSize(), readCount);
}
TReadDebugInfo info;
- auto result = response->Record.GetPartitionResponse().GetCmdReadResult();
+ auto result = response.GetPartitionResponse().GetCmdReadResult();
if (result.HasBlobsFromDisk())
info.BlobsFromDisk = result.GetBlobsFromDisk();
if (result.HasBlobsFromCache())
@@ -1193,14 +1137,12 @@ public:
request.Get()->Record.SetTicket(ticket);
}
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
+ auto response = CallPersQueueGRPC(request->Record);
- auto status = response->Record.GetStatus();
- auto errorCode = response->Record.GetErrorCode();
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response->Record.GetErrorReason());
- UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response->Record.GetErrorReason());
+ auto status = response.GetStatus();
+ auto errorCode = response.GetErrorCode();
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)status, expectedStatus, response.GetErrorReason());
+ UNIT_ASSERT_VALUES_EQUAL_C((ui32)errorCode, (ui32)expectedError, response.GetErrorReason());
}
void SetClientOffsetPQ(const TString& topic, ui32 partition, ui64 offset, const TString& ticket = "",
@@ -1211,24 +1153,21 @@ public:
void FetchRequestPQ(const TVector<FetchPartInfo>& fetchParts, ui32 maxBytes, ui32 waitMs) {
THolder<NMsgBusProxy::TBusPersQueue> request = TFetchRequestPQ().GetRequest(fetchParts, maxBytes, waitMs);
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
+ CallPersQueueGRPC(request->Record);
}
void GetPartOffset(const TVector<std::pair<TString, TVector<ui32>>>& topicsAndParts, ui32 resCount, ui32 hasClientOffset, bool ok) {
THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetPartOffsets().GetRequest(topicsAndParts);
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
+ auto response = CallPersQueueGRPC(request->Record);
+
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
"proxy failure");
if (!ok)
return;
- auto res = response->Record.GetMetaResponse().GetCmdGetPartitionOffsetsResult();
+ auto res = response.GetMetaResponse().GetCmdGetPartitionOffsetsResult();
ui32 count = 0;
ui32 clientOffsetCount = 0;
for (ui32 i = 0; i < res.TopicResultSize(); ++i) {
@@ -1247,11 +1186,9 @@ public:
THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetClientInfo().GetRequest(topics, user);
Cerr << "Request: " << request->Record << Endl;
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- Cerr << "Response: " << response->Record << "\n";
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
+ auto response = CallPersQueueGRPC(request->Record);
+
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
"proxy failure");
THashSet<TString> good;
THashSet<TString> bad;
@@ -1263,7 +1200,7 @@ public:
good.insert(t);
}
}
- for (auto& tt : response->Record.GetMetaResponse().GetCmdGetReadSessionsInfoResult().GetTopicResult()) {
+ for (auto& tt : response.GetMetaResponse().GetCmdGetReadSessionsInfoResult().GetTopicResult()) {
const auto& topic = tt.GetTopic();
if (bad.contains(topic)) {
UNIT_ASSERT(tt.GetErrorCode() != (ui32)NPersQueue::NErrorCode::OK);
@@ -1271,22 +1208,21 @@ public:
UNIT_ASSERT(tt.GetErrorCode() == (ui32)NPersQueue::NErrorCode::OK);
}
}
- return response->Record;
+ return response;
}
void GetPartStatus(const TVector<std::pair<TString, TVector<ui32>>>& topicsAndParts, ui32 resCount, bool ok) {
THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetPartStatus().GetRequest(topicsAndParts);
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
+ auto response = CallPersQueueGRPC(request->Record);
+
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
"proxy failure");
if (!ok)
return;
- auto res = response->Record.GetMetaResponse().GetCmdGetPartitionStatusResult();
+ auto res = response.GetMetaResponse().GetCmdGetPartitionStatusResult();
ui32 count = 0;
for (ui32 i = 0; i < res.TopicResultSize(); ++i) {
auto t = res.GetTopicResult(i);
@@ -1306,20 +1242,19 @@ public:
THolder<NMsgBusProxy::TBusPersQueue> request = TRequestGetPartLocations().GetRequest(topicsAndParts);
- TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- if (response->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
+ auto response = CallPersQueueGRPC(request->Record);
+
+ if (response.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
doRetry = true;
continue;
}
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), ok ? NMsgBusProxy::MSTATUS_OK : NMsgBusProxy::MSTATUS_ERROR,
"proxy failure");
if (!ok)
return {};
- auto res = response->Record.GetMetaResponse().GetCmdGetPartitionLocationsResult();
+ auto res = response.GetMetaResponse().GetCmdGetPartitionLocationsResult();
for (ui32 i = 0; i < res.TopicResultSize(); ++i) {
auto t = res.GetTopicResult(i);
@@ -1347,17 +1282,17 @@ public:
THolder<NMsgBusProxy::TBusPersQueue> request = TRequestDescribePQ().GetRequest(topics);
TAutoPtr<NBus::TBusMessage> reply;
- const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request.Release(), reply);
- UNIT_ASSERT(response);
- if ((NMsgBusProxy::EResponseStatus)response->Record.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
+ auto response = CallPersQueueGRPC(request->Record);
+
+ if ((NMsgBusProxy::EResponseStatus)response.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
UNIT_ASSERT(error);
return {};
}
- UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK,
+ UNIT_ASSERT_VALUES_EQUAL_C((NMsgBusProxy::EResponseStatus)response.GetStatus(), NMsgBusProxy::MSTATUS_OK,
"proxy failure");
- auto res = response->Record.GetMetaResponse().GetCmdGetTopicMetadataResult();
+ auto res = response.GetMetaResponse().GetCmdGetTopicMetadataResult();
UNIT_ASSERT(topics.size() <= res.TopicInfoSize());
for (ui32 i = 0; i < res.TopicInfoSize(); ++i) {
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 517c6f9a9f8..90fa6c3d029 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -1865,8 +1865,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(status.ok());
}
- server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 1, "valuevaluevalue1", "", ETransport::GRpc);
- server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 2, "valuevaluevalue1", "", ETransport::GRpc);
+ server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 1, "valuevaluevalue1", "");
+ server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "abacaba", 2, "valuevaluevalue1", "");
}
Y_UNIT_TEST(WriteExistingBigValue) {
@@ -1890,10 +1890,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.EnableLogs({ NKikimrServices::PERSQUEUE });
// empty data and sourceId
- server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR);
- server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR);
- server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "a", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR);
- server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "a", "", ETransport::MsgBus, NMsgBusProxy::MSTATUS_OK);
+ server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "", "", NMsgBusProxy::MSTATUS_ERROR);
+ server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "", "", NMsgBusProxy::MSTATUS_ERROR);
+ server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "", 1, "a", "", NMsgBusProxy::MSTATUS_ERROR);
+ server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 1, "a", 1, "a", "", NMsgBusProxy::MSTATUS_OK);
}
@@ -1906,7 +1906,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ(
DEFAULT_TOPIC_NAME, 100500, "abacaba", 1, "valuevaluevalue1", "",
- ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
+ NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
);
}
@@ -1917,7 +1917,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ(
DEFAULT_TOPIC_NAME + "000", 1, "abacaba", 1, "valuevaluevalue1", "",
- ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
+ NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
);
}
@@ -1946,7 +1946,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ(
DEFAULT_TOPIC_NAME, 5, "abacaba", 1, "valuevaluevalue1", "",
- ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
+ NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
);
server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -1958,7 +1958,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->WriteToPQ(DEFAULT_TOPIC_NAME, 5, "abacaba", 1, "valuevaluevalue1");
server.AnnoyingClient->WriteToPQ(
DEFAULT_TOPIC_NAME, 15, "abacaba", 1, "valuevaluevalue1", "",
- ETransport::MsgBus, NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
+ NMsgBusProxy::MSTATUS_ERROR, NMsgBusProxy::MSTATUS_ERROR
);
server.AnnoyingClient->AlterTopic(DEFAULT_TOPIC_NAME, 20);
@@ -1990,7 +1990,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Y_UNIT_TEST(BigRead) {
- NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root"));
+ NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(24_MB));
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400, 20000000, "user", 2000000);
server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE });
@@ -1999,7 +1999,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
for (ui32 i = 0; i < 32; ++i)
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
- // trying to read small PQ messages in a big messagebus event
+ // trying to read small PQ messages in a big gRPC event
auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0);
UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4);
@@ -2016,7 +2016,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
// expects that L2 size is 32Mb
Y_UNIT_TEST(Cache) {
- NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root"));
+ NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(18_MB));
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8_MB, 86400);
server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE });
@@ -2047,7 +2047,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
Y_UNIT_TEST(CacheHead) {
- NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root"));
+ NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root").SetGrpcMaxMessageSize(16_MB));
server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 6_MB, 86400);
server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE });