aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-12-02 15:48:13 +0300
committeralexnick <alexnick@ydb.tech>2022-12-02 15:48:13 +0300
commitfed7a148a7ecfadf0db2b66ce34aec068ee02e97 (patch)
tree0bbe26e7f7e4d1d403d0d8e6138de80dbea0eed4
parent1972eeb85e4faf4fbf5b5bc6d8224b0707728ed0 (diff)
downloadydb-fed7a148a7ecfadf0db2b66ce34aec068ee02e97.tar.gz
viewer supports public describe for topic/consumer
progress progress progress progress progress progress progress progress progress
-rw-r--r--ydb/core/viewer/CMakeLists.txt10
-rw-r--r--ydb/core/viewer/json_handlers_viewer.cpp3
-rw-r--r--ydb/core/viewer/json_local_rpc.h288
-rw-r--r--ydb/library/persqueue/tests/counters.cpp9
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp15
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.h4
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.cpp41
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp1
8 files changed, 358 insertions, 13 deletions
diff --git a/ydb/core/viewer/CMakeLists.txt b/ydb/core/viewer/CMakeLists.txt
index 5b63ef674b..ea82c37547 100644
--- a/ydb/core/viewer/CMakeLists.txt
+++ b/ydb/core/viewer/CMakeLists.txt
@@ -21,10 +21,13 @@ target_link_libraries(ydb-core-viewer PUBLIC
cpp-actors-core
library-cpp-archive
cpp-mime-types
+ cpp-protobuf-json
ydb-core-base
core-blobstorage-base
blobstorage-vdisk-common
core-client-server
+ ydb-core-grpc_services
+ core-grpc_services-local_rpc
ydb-core-health_check
ydb-core-node_whiteboard
ydb-core-protos
@@ -38,6 +41,8 @@ target_link_libraries(ydb-core-viewer PUBLIC
api-protos
lib-deprecated-kicli
public-lib-json_value
+ api-grpc
+ cpp-client-ydb_types
)
target_sources(ydb-core-viewer PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/viewer/json_handlers_vdisk.cpp
@@ -59,10 +64,13 @@ target_link_libraries(ydb-core-viewer.global PUBLIC
cpp-actors-core
library-cpp-archive
cpp-mime-types
+ cpp-protobuf-json
ydb-core-base
core-blobstorage-base
blobstorage-vdisk-common
core-client-server
+ ydb-core-grpc_services
+ core-grpc_services-local_rpc
ydb-core-health_check
ydb-core-node_whiteboard
ydb-core-protos
@@ -76,6 +84,8 @@ target_link_libraries(ydb-core-viewer.global PUBLIC
api-protos
lib-deprecated-kicli
public-lib-json_value
+ api-grpc
+ cpp-client-ydb_types
)
target_sources(ydb-core-viewer.global PRIVATE
${CMAKE_BINARY_DIR}/ydb/core/viewer/1cdc663173c623f6a008fb99b02498f1.cpp
diff --git a/ydb/core/viewer/json_handlers_viewer.cpp b/ydb/core/viewer/json_handlers_viewer.cpp
index a564147bb9..b3c7954d71 100644
--- a/ydb/core/viewer/json_handlers_viewer.cpp
+++ b/ydb/core/viewer/json_handlers_viewer.cpp
@@ -5,6 +5,7 @@
#include "json_vdiskinfo.h"
#include "json_pdiskinfo.h"
#include "json_describe.h"
+#include "json_local_rpc.h"
#include "json_hotkeys.h"
#include "json_sysinfo.h"
#include "json_tabletinfo.h"
@@ -44,6 +45,8 @@ void TViewerJsonHandlers::Init() {
JsonHandlers["/json/vdiskinfo"] = new TJsonHandler<TJsonVDiskInfo>;
JsonHandlers["/json/pdiskinfo"] = new TJsonHandler<TJsonPDiskInfo>;
JsonHandlers["/json/describe"] = new TJsonHandler<TJsonDescribe>;
+ JsonHandlers["/json/describe_topic"] = new TJsonHandler<TJsonDescribeTopic>;
+ JsonHandlers["/json/describe_consumer"] = new TJsonHandler<TJsonDescribeConsumer>;
JsonHandlers["/json/hotkeys"] = new TJsonHandler<TJsonHotkeys>;
JsonHandlers["/json/sysinfo"] = new TJsonHandler<TJsonSysInfo>;
JsonHandlers["/json/tabletinfo"] = new TJsonHandler<TJsonTabletInfo>;
diff --git a/ydb/core/viewer/json_local_rpc.h b/ydb/core/viewer/json_local_rpc.h
new file mode 100644
index 0000000000..bab530b264
--- /dev/null
+++ b/ydb/core/viewer/json_local_rpc.h
@@ -0,0 +1,288 @@
+#pragma once
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/mon.h>
+#include <library/cpp/protobuf/json/json2proto.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include "viewer.h"
+#include "json_pipe_req.h"
+
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
+#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
+
+namespace NKikimr {
+namespace NViewer {
+
+struct TEvLocalRpcPrivate {
+ enum EEv {
+ EvGrpcRequestResult = EventSpaceBegin(NActors::TEvents::ES_PRIVATE) + 100,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ template<class TProtoResult>
+ struct TEvGrpcRequestResult : NActors::TEventLocal<TEvGrpcRequestResult<TProtoResult>, EvGrpcRequestResult> {
+ THolder<TProtoResult> Message;
+ THolder<NYdb::TStatus> Status;
+
+ TEvGrpcRequestResult()
+ {}
+ };
+};
+
+using namespace NActors;
+using NSchemeShard::TEvSchemeShard;
+
+template <class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoService, class TRpcEv>
+class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>> {
+ using TThis = TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>;
+ using TBase = TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>>;
+
+ using TBase::Send;
+ using TBase::PassAway;
+ using TBase::Become;
+
+ IViewer* Viewer;
+ NMon::TEvHttpInfo::TPtr Event;
+ TAutoPtr<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>> DescribeResult;
+
+ TJsonSettings JsonSettings;
+ ui32 Timeout = 0;
+
+ TString Database;
+
+ NThreading::TFuture<TProtoResponse> RpcFuture;
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::VIEWER_HANDLER;
+ }
+
+ TJsonLocalRpc(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev)
+ : Viewer(viewer)
+ , Event(ev)
+ {}
+
+ TProtoRequest Params2Proto(const TCgiParameters& params) {
+ TProtoRequest request;
+ using google::protobuf::Descriptor;
+ using google::protobuf::Reflection;
+ using google::protobuf::FieldDescriptor;
+ using google::protobuf::EnumDescriptor;
+ using google::protobuf::EnumValueDescriptor;
+ const Descriptor& descriptor = *request.GetDescriptor();
+ const Reflection& reflection = *request.GetReflection();
+ for (int idx = 0; idx < descriptor.field_count(); ++idx) {
+ const FieldDescriptor* field = descriptor.field(idx);
+ TString name;
+ name = field->name();
+ TString value = params.Get(name);
+ if (!value.empty()) {
+ FieldDescriptor::CppType type = field->cpp_type();
+ switch (type) {
+#define CASE(BT, ST, CT) case FieldDescriptor::CPPTYPE_##BT: {\
+ ST res = {};\
+ if (TryFromString(value, res)) {\
+ reflection.Set##CT(&request, field, res);\
+ }\
+ break;\
+ }
+
+ CASE(INT32, i32, Int32);
+ CASE(INT64, i64, Int64);
+ CASE(UINT32, ui32, UInt32);
+ CASE(UINT64, ui64, UInt64);
+ CASE(FLOAT, float, Float);
+ CASE(DOUBLE, double, Double);
+ CASE(BOOL, bool, Bool);
+ CASE(STRING, string, String);
+#undef CASE
+ case FieldDescriptor::CPPTYPE_ENUM: {
+ const EnumDescriptor* enumDescriptor = field->enum_type();
+ const EnumValueDescriptor* enumValueDescriptor = enumDescriptor->FindValueByName(value);
+ int number = 0;
+ if (enumValueDescriptor == nullptr && TryFromString(value, number)) {
+ enumValueDescriptor = enumDescriptor->FindValueByNumber(number);
+ }
+ if (enumValueDescriptor != nullptr) {
+ reflection.SetEnum(&request, field, enumValueDescriptor);
+ }
+ break;
+ }
+ case FieldDescriptor::CPPTYPE_MESSAGE:
+ break;
+ }
+ }
+ }
+ return request;
+ }
+
+ TProtoRequest Params2Proto() {
+ TProtoRequest request;
+ NProtobufJson::TJson2ProtoConfig json2ProtoConfig;
+ auto postData = Event->Get()->Request.GetPostContent();
+ if (!postData.empty()) {
+ try {
+ NProtobufJson::Json2Proto(postData, request, json2ProtoConfig);
+ }
+ catch (const yexception& e) {
+ Send(Event->Sender, new NMon::TEvHttpInfoRes(HTTPBADREQUEST, 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ PassAway();
+ }
+ } else {
+ const auto& params(Event->Get()->Request.GetParams());
+ return Params2Proto(params);
+ }
+ return request;
+ }
+
+ void SendGrpcRequest() {
+ TProtoRequest request = Params2Proto();
+
+ RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), Database,
+ Event->Get()->UserToken, TlsActivationContext->ActorSystem());
+ RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()]
+ (const NThreading::TFuture<TProtoResponse>& future) {
+ auto& response = future.GetValueSync();
+ auto result = MakeHolder<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>>();
+ Y_VERIFY(response.operation().ready());
+ if (response.operation().status() == Ydb::StatusIds::SUCCESS) {
+ TProtoResult rs;
+ response.operation().result().UnpackTo(&rs);
+ result->Message = MakeHolder<TProtoResult>(rs);
+ }
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(response.operation().issues(), issues);
+ result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()),
+ std::move(issues));
+
+ actorSystem->Send(actorId, result.Release());
+ });
+ }
+
+
+ void Bootstrap() {
+ const auto& params(Event->Get()->Request.GetParams());
+ JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), false);
+ JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false);
+ Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000);
+ Database = params.Get("database_path");
+
+ SendGrpcRequest();
+
+ Become(&TThis::StateRequestedDescribe, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
+ }
+
+ void Handle(typename TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>::TPtr& ev) {
+ DescribeResult = ev->Release();
+ ReplyAndPassAway();
+ }
+
+ STATEFN(StateRequestedDescribe) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>, Handle);
+ cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ }
+ }
+
+ void ReplyAndPassAway() {
+ TStringStream json;
+ TString headers = Viewer->GetHTTPOKJSON(Event->Get());
+ if (DescribeResult) {
+ if (!DescribeResult->Status->IsSuccess()) {
+ headers = HTTPBADREQUEST;
+ if (DescribeResult->Status->GetStatus() == NYdb::EStatus::UNAUTHORIZED) {
+ headers = HTTPFORBIDDENJSON;
+ }
+ } else {
+ TProtoToJson::ProtoToJson(json, *(DescribeResult->Message), JsonSettings);
+ }
+ } else {
+ json << "null";
+ }
+
+ Send(Event->Sender, new NMon::TEvHttpInfoRes(headers + json.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ PassAway();
+ }
+
+ void HandleTimeout() {
+ Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
+ PassAway();
+ }
+};
+
+
+using TJsonDescribeTopic = TJsonLocalRpc<Ydb::Topic::DescribeTopicRequest,
+ Ydb::Topic::DescribeTopicResponse,
+ Ydb::Topic::DescribeTopicResult,
+ Ydb::Topic::V1::TopicService,
+ NKikimr::NGRpcService::TEvDescribeTopicRequest>;
+
+using TJsonDescribeConsumer = TJsonLocalRpc<Ydb::Topic::DescribeConsumerRequest,
+ Ydb::Topic::DescribeConsumerResponse,
+ Ydb::Topic::DescribeConsumerResult,
+ Ydb::Topic::V1::TopicService,
+ NKikimr::NGRpcService::TEvDescribeConsumerRequest>;
+
+template <>
+struct TJsonRequestParameters<TJsonDescribeTopic> {
+ static TString GetParameters() {
+ return R"___([{"name":"path","in":"query","description":"schema path","required":false,"type":"string"},
+ {"name":"enums","in":"query","description":"convert enums to strings","required":false,"type":"boolean"},
+ {"name":"ui64","in":"query","description":"return ui64 as number","required":false,"type":"boolean"},
+ {"name":"timeout","in":"query","description":"timeout in ms","required":false,"type":"integer"},
+ {"name":"database_path","in":"query","description":"database path","required":false,"type":"string"},
+ {"name":"include_stats","in":"query","description":"include stat flag","required":false,"type":"bool"}])___";
+ }
+};
+
+template <>
+struct TJsonRequestSummary<TJsonDescribeTopic> {
+ static TString GetSummary() {
+ return "\"Topic schema detailed information\"";
+ }
+};
+
+template <>
+struct TJsonRequestDescription<TJsonDescribeTopic> {
+ static TString GetDescription() {
+ return "\"Returns detailed information about topic\"";
+ }
+};
+
+
+template <>
+struct TJsonRequestParameters<TJsonDescribeConsumer> {
+ static TString GetParameters() {
+ return R"___([{"name":"path","in":"query","description":"schema path","required":false,"type":"string"},
+ {"name":"enums","in":"query","description":"convert enums to strings","required":false,"type":"boolean"},
+ {"name":"ui64","in":"query","description":"return ui64 as number","required":false,"type":"boolean"},
+ {"name":"timeout","in":"query","description":"timeout in ms","required":false,"type":"integer"},
+ {"name":"database_path","in":"query","description":"database path","required":false,"type":"string"},
+ {"name":"consumer","in":"query","description":"consumer name","required":false,"type":"string"},
+ {"name":"include_stats","in":"query","description":"include stat flag","required":false,"type":"bool"}])___";
+ }
+};
+
+template <>
+struct TJsonRequestSummary<TJsonDescribeConsumer> {
+ static TString GetSummary() {
+ return "\"Topic's consumer detailed information\"";
+ }
+};
+
+template <>
+struct TJsonRequestDescription<TJsonDescribeConsumer> {
+ static TString GetDescription() {
+ return "\"Returns detailed information about topic's consumer\"";
+ }
+};
+
+
+}
+}
diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp
index b48ed45e48..2f3af93080 100644
--- a/ydb/library/persqueue/tests/counters.cpp
+++ b/ydb/library/persqueue/tests/counters.cpp
@@ -20,15 +20,18 @@ NJson::TJsonValue SendQuery(ui16 port, const TString& query, bool mayFail) {
TString firstLine = input.FirstLine();
const auto httpCode = ParseHttpRetCode(firstLine);
+ NJson::TJsonValue value;
+ bool res = NJson::ReadJsonTree(&input, &value);
+
+ Cerr << "counters: " << value.GetStringRobust() << "\n";
+
+ UNIT_ASSERT(res);
if (mayFail && httpCode != 200u) {
return {};
} else {
UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u);
}
- NJson::TJsonValue value;
- UNIT_ASSERT(NJson::ReadJsonTree(&input, &value));
- Cerr << "counters: " << value.GetStringRobust() << "\n";
return value;
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 3fdfb0821f..0eddfe0cf2 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -443,12 +443,27 @@ TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopic
{
}
+TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * ctx)
+ : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->path())
+ , TDescribeTopicActorImpl("")
+{
+}
+
+
+
TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request)
: TBase(request, request->GetProtoRequest()->path())
, TDescribeTopicActorImpl(request->GetProtoRequest()->consumer())
{
}
+TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::IRequestOpCtx * ctx)
+ : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->path())
+ , TDescribeTopicActorImpl(dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->consumer())
+{
+}
+
+
TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TString& consumer)
: Consumer(consumer)
{
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h
index e3834c298b..ec1182bd5c 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.h
+++ b/ydb/services/persqueue_v1/actors/schema_actors.h
@@ -117,6 +117,8 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
public:
TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request);
+ TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * ctx);
+
~TDescribeTopicActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -142,6 +144,8 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
public:
TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request);
+ TDescribeConsumerActor(NKikimr::NGRpcService::IRequestOpCtx * ctx);
+
~TDescribeConsumerActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
index b116f03c01..e020ffd64b 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
@@ -141,36 +141,38 @@ void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest:
}
+namespace NKikimr {
+namespace NGRpcService {
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
@@ -178,10 +180,29 @@ void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEv
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
+void TGRpcRequestProxy::Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
+
+
+#ifdef DECLARE_RPC
+#error DECLARE_RPC macro already defined
+#endif
+
+#define DECLARE_RPC(name) template<> IActor* TEv##name##Request::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { \
+ return new NKikimr::NGRpcProxy::V1::T##name##Actor(msg);\
+ }
+
+DECLARE_RPC(DescribeTopic);
+DECLARE_RPC(DescribeConsumer);
+
+#undef DECLARE_RPC
+
+
+
+}
+} \ No newline at end of file
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 77c91b8b98..c4d2620d22 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -2658,6 +2658,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
const TString& clientDc, const TString& originDc,
const TString& client, const TString& consumerPath) {
NJson::TJsonValue counters;
+
if (clientDc.empty() && originDc.empty()) {
counters = GetClientCountersLegacy(monPort, "pqproxy", session, client, consumerPath);
} else {