diff options
author | alexnick <alexnick@ydb.tech> | 2022-12-02 15:48:13 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-12-02 15:48:13 +0300 |
commit | fed7a148a7ecfadf0db2b66ce34aec068ee02e97 (patch) | |
tree | 0bbe26e7f7e4d1d403d0d8e6138de80dbea0eed4 | |
parent | 1972eeb85e4faf4fbf5b5bc6d8224b0707728ed0 (diff) | |
download | ydb-fed7a148a7ecfadf0db2b66ce34aec068ee02e97.tar.gz |
viewer supports public describe for topic/consumer
progress
progress
progress
progress
progress
progress
progress
progress
progress
-rw-r--r-- | ydb/core/viewer/CMakeLists.txt | 10 | ||||
-rw-r--r-- | ydb/core/viewer/json_handlers_viewer.cpp | 3 | ||||
-rw-r--r-- | ydb/core/viewer/json_local_rpc.h | 288 | ||||
-rw-r--r-- | ydb/library/persqueue/tests/counters.cpp | 9 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.cpp | 15 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/schema_actors.h | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_schema.cpp | 41 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 1 |
8 files changed, 358 insertions, 13 deletions
diff --git a/ydb/core/viewer/CMakeLists.txt b/ydb/core/viewer/CMakeLists.txt index 5b63ef674b..ea82c37547 100644 --- a/ydb/core/viewer/CMakeLists.txt +++ b/ydb/core/viewer/CMakeLists.txt @@ -21,10 +21,13 @@ target_link_libraries(ydb-core-viewer PUBLIC cpp-actors-core library-cpp-archive cpp-mime-types + cpp-protobuf-json ydb-core-base core-blobstorage-base blobstorage-vdisk-common core-client-server + ydb-core-grpc_services + core-grpc_services-local_rpc ydb-core-health_check ydb-core-node_whiteboard ydb-core-protos @@ -38,6 +41,8 @@ target_link_libraries(ydb-core-viewer PUBLIC api-protos lib-deprecated-kicli public-lib-json_value + api-grpc + cpp-client-ydb_types ) target_sources(ydb-core-viewer PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/viewer/json_handlers_vdisk.cpp @@ -59,10 +64,13 @@ target_link_libraries(ydb-core-viewer.global PUBLIC cpp-actors-core library-cpp-archive cpp-mime-types + cpp-protobuf-json ydb-core-base core-blobstorage-base blobstorage-vdisk-common core-client-server + ydb-core-grpc_services + core-grpc_services-local_rpc ydb-core-health_check ydb-core-node_whiteboard ydb-core-protos @@ -76,6 +84,8 @@ target_link_libraries(ydb-core-viewer.global PUBLIC api-protos lib-deprecated-kicli public-lib-json_value + api-grpc + cpp-client-ydb_types ) target_sources(ydb-core-viewer.global PRIVATE ${CMAKE_BINARY_DIR}/ydb/core/viewer/1cdc663173c623f6a008fb99b02498f1.cpp diff --git a/ydb/core/viewer/json_handlers_viewer.cpp b/ydb/core/viewer/json_handlers_viewer.cpp index a564147bb9..b3c7954d71 100644 --- a/ydb/core/viewer/json_handlers_viewer.cpp +++ b/ydb/core/viewer/json_handlers_viewer.cpp @@ -5,6 +5,7 @@ #include "json_vdiskinfo.h" #include "json_pdiskinfo.h" #include "json_describe.h" +#include "json_local_rpc.h" #include "json_hotkeys.h" #include "json_sysinfo.h" #include "json_tabletinfo.h" @@ -44,6 +45,8 @@ void TViewerJsonHandlers::Init() { JsonHandlers["/json/vdiskinfo"] = new TJsonHandler<TJsonVDiskInfo>; JsonHandlers["/json/pdiskinfo"] = new TJsonHandler<TJsonPDiskInfo>; JsonHandlers["/json/describe"] = new TJsonHandler<TJsonDescribe>; + JsonHandlers["/json/describe_topic"] = new TJsonHandler<TJsonDescribeTopic>; + JsonHandlers["/json/describe_consumer"] = new TJsonHandler<TJsonDescribeConsumer>; JsonHandlers["/json/hotkeys"] = new TJsonHandler<TJsonHotkeys>; JsonHandlers["/json/sysinfo"] = new TJsonHandler<TJsonSysInfo>; JsonHandlers["/json/tabletinfo"] = new TJsonHandler<TJsonTabletInfo>; diff --git a/ydb/core/viewer/json_local_rpc.h b/ydb/core/viewer/json_local_rpc.h new file mode 100644 index 0000000000..bab530b264 --- /dev/null +++ b/ydb/core/viewer/json_local_rpc.h @@ -0,0 +1,288 @@ +#pragma once +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/mon.h> +#include <library/cpp/protobuf/json/json2proto.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include "viewer.h" +#include "json_pipe_req.h" + +#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> +#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> + +namespace NKikimr { +namespace NViewer { + +struct TEvLocalRpcPrivate { + enum EEv { + EvGrpcRequestResult = EventSpaceBegin(NActors::TEvents::ES_PRIVATE) + 100, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + template<class TProtoResult> + struct TEvGrpcRequestResult : NActors::TEventLocal<TEvGrpcRequestResult<TProtoResult>, EvGrpcRequestResult> { + THolder<TProtoResult> Message; + THolder<NYdb::TStatus> Status; + + TEvGrpcRequestResult() + {} + }; +}; + +using namespace NActors; +using NSchemeShard::TEvSchemeShard; + +template <class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoService, class TRpcEv> +class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>> { + using TThis = TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>; + using TBase = TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>>; + + using TBase::Send; + using TBase::PassAway; + using TBase::Become; + + IViewer* Viewer; + NMon::TEvHttpInfo::TPtr Event; + TAutoPtr<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>> DescribeResult; + + TJsonSettings JsonSettings; + ui32 Timeout = 0; + + TString Database; + + NThreading::TFuture<TProtoResponse> RpcFuture; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::VIEWER_HANDLER; + } + + TJsonLocalRpc(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev) + : Viewer(viewer) + , Event(ev) + {} + + TProtoRequest Params2Proto(const TCgiParameters& params) { + TProtoRequest request; + using google::protobuf::Descriptor; + using google::protobuf::Reflection; + using google::protobuf::FieldDescriptor; + using google::protobuf::EnumDescriptor; + using google::protobuf::EnumValueDescriptor; + const Descriptor& descriptor = *request.GetDescriptor(); + const Reflection& reflection = *request.GetReflection(); + for (int idx = 0; idx < descriptor.field_count(); ++idx) { + const FieldDescriptor* field = descriptor.field(idx); + TString name; + name = field->name(); + TString value = params.Get(name); + if (!value.empty()) { + FieldDescriptor::CppType type = field->cpp_type(); + switch (type) { +#define CASE(BT, ST, CT) case FieldDescriptor::CPPTYPE_##BT: {\ + ST res = {};\ + if (TryFromString(value, res)) {\ + reflection.Set##CT(&request, field, res);\ + }\ + break;\ + } + + CASE(INT32, i32, Int32); + CASE(INT64, i64, Int64); + CASE(UINT32, ui32, UInt32); + CASE(UINT64, ui64, UInt64); + CASE(FLOAT, float, Float); + CASE(DOUBLE, double, Double); + CASE(BOOL, bool, Bool); + CASE(STRING, string, String); +#undef CASE + case FieldDescriptor::CPPTYPE_ENUM: { + const EnumDescriptor* enumDescriptor = field->enum_type(); + const EnumValueDescriptor* enumValueDescriptor = enumDescriptor->FindValueByName(value); + int number = 0; + if (enumValueDescriptor == nullptr && TryFromString(value, number)) { + enumValueDescriptor = enumDescriptor->FindValueByNumber(number); + } + if (enumValueDescriptor != nullptr) { + reflection.SetEnum(&request, field, enumValueDescriptor); + } + break; + } + case FieldDescriptor::CPPTYPE_MESSAGE: + break; + } + } + } + return request; + } + + TProtoRequest Params2Proto() { + TProtoRequest request; + NProtobufJson::TJson2ProtoConfig json2ProtoConfig; + auto postData = Event->Get()->Request.GetPostContent(); + if (!postData.empty()) { + try { + NProtobufJson::Json2Proto(postData, request, json2ProtoConfig); + } + catch (const yexception& e) { + Send(Event->Sender, new NMon::TEvHttpInfoRes(HTTPBADREQUEST, 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } + } else { + const auto& params(Event->Get()->Request.GetParams()); + return Params2Proto(params); + } + return request; + } + + void SendGrpcRequest() { + TProtoRequest request = Params2Proto(); + + RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), Database, + Event->Get()->UserToken, TlsActivationContext->ActorSystem()); + RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()] + (const NThreading::TFuture<TProtoResponse>& future) { + auto& response = future.GetValueSync(); + auto result = MakeHolder<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>>(); + Y_VERIFY(response.operation().ready()); + if (response.operation().status() == Ydb::StatusIds::SUCCESS) { + TProtoResult rs; + response.operation().result().UnpackTo(&rs); + result->Message = MakeHolder<TProtoResult>(rs); + } + NYql::TIssues issues; + NYql::IssuesFromMessage(response.operation().issues(), issues); + result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()), + std::move(issues)); + + actorSystem->Send(actorId, result.Release()); + }); + } + + + void Bootstrap() { + const auto& params(Event->Get()->Request.GetParams()); + JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), false); + JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false); + Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000); + Database = params.Get("database_path"); + + SendGrpcRequest(); + + Become(&TThis::StateRequestedDescribe, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup()); + } + + void Handle(typename TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>::TPtr& ev) { + DescribeResult = ev->Release(); + ReplyAndPassAway(); + } + + STATEFN(StateRequestedDescribe) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleTimeout); + } + } + + void ReplyAndPassAway() { + TStringStream json; + TString headers = Viewer->GetHTTPOKJSON(Event->Get()); + if (DescribeResult) { + if (!DescribeResult->Status->IsSuccess()) { + headers = HTTPBADREQUEST; + if (DescribeResult->Status->GetStatus() == NYdb::EStatus::UNAUTHORIZED) { + headers = HTTPFORBIDDENJSON; + } + } else { + TProtoToJson::ProtoToJson(json, *(DescribeResult->Message), JsonSettings); + } + } else { + json << "null"; + } + + Send(Event->Sender, new NMon::TEvHttpInfoRes(headers + json.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } + + void HandleTimeout() { + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPGATEWAYTIMEOUT(), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } +}; + + +using TJsonDescribeTopic = TJsonLocalRpc<Ydb::Topic::DescribeTopicRequest, + Ydb::Topic::DescribeTopicResponse, + Ydb::Topic::DescribeTopicResult, + Ydb::Topic::V1::TopicService, + NKikimr::NGRpcService::TEvDescribeTopicRequest>; + +using TJsonDescribeConsumer = TJsonLocalRpc<Ydb::Topic::DescribeConsumerRequest, + Ydb::Topic::DescribeConsumerResponse, + Ydb::Topic::DescribeConsumerResult, + Ydb::Topic::V1::TopicService, + NKikimr::NGRpcService::TEvDescribeConsumerRequest>; + +template <> +struct TJsonRequestParameters<TJsonDescribeTopic> { + static TString GetParameters() { + return R"___([{"name":"path","in":"query","description":"schema path","required":false,"type":"string"}, + {"name":"enums","in":"query","description":"convert enums to strings","required":false,"type":"boolean"}, + {"name":"ui64","in":"query","description":"return ui64 as number","required":false,"type":"boolean"}, + {"name":"timeout","in":"query","description":"timeout in ms","required":false,"type":"integer"}, + {"name":"database_path","in":"query","description":"database path","required":false,"type":"string"}, + {"name":"include_stats","in":"query","description":"include stat flag","required":false,"type":"bool"}])___"; + } +}; + +template <> +struct TJsonRequestSummary<TJsonDescribeTopic> { + static TString GetSummary() { + return "\"Topic schema detailed information\""; + } +}; + +template <> +struct TJsonRequestDescription<TJsonDescribeTopic> { + static TString GetDescription() { + return "\"Returns detailed information about topic\""; + } +}; + + +template <> +struct TJsonRequestParameters<TJsonDescribeConsumer> { + static TString GetParameters() { + return R"___([{"name":"path","in":"query","description":"schema path","required":false,"type":"string"}, + {"name":"enums","in":"query","description":"convert enums to strings","required":false,"type":"boolean"}, + {"name":"ui64","in":"query","description":"return ui64 as number","required":false,"type":"boolean"}, + {"name":"timeout","in":"query","description":"timeout in ms","required":false,"type":"integer"}, + {"name":"database_path","in":"query","description":"database path","required":false,"type":"string"}, + {"name":"consumer","in":"query","description":"consumer name","required":false,"type":"string"}, + {"name":"include_stats","in":"query","description":"include stat flag","required":false,"type":"bool"}])___"; + } +}; + +template <> +struct TJsonRequestSummary<TJsonDescribeConsumer> { + static TString GetSummary() { + return "\"Topic's consumer detailed information\""; + } +}; + +template <> +struct TJsonRequestDescription<TJsonDescribeConsumer> { + static TString GetDescription() { + return "\"Returns detailed information about topic's consumer\""; + } +}; + + +} +} diff --git a/ydb/library/persqueue/tests/counters.cpp b/ydb/library/persqueue/tests/counters.cpp index b48ed45e48..2f3af93080 100644 --- a/ydb/library/persqueue/tests/counters.cpp +++ b/ydb/library/persqueue/tests/counters.cpp @@ -20,15 +20,18 @@ NJson::TJsonValue SendQuery(ui16 port, const TString& query, bool mayFail) { TString firstLine = input.FirstLine(); const auto httpCode = ParseHttpRetCode(firstLine); + NJson::TJsonValue value; + bool res = NJson::ReadJsonTree(&input, &value); + + Cerr << "counters: " << value.GetStringRobust() << "\n"; + + UNIT_ASSERT(res); if (mayFail && httpCode != 200u) { return {}; } else { UNIT_ASSERT_VALUES_EQUAL(httpCode, 200u); } - NJson::TJsonValue value; - UNIT_ASSERT(NJson::ReadJsonTree(&input, &value)); - Cerr << "counters: " << value.GetStringRobust() << "\n"; return value; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 3fdfb0821f..0eddfe0cf2 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -443,12 +443,27 @@ TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopic { } +TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * ctx) + : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->path()) + , TDescribeTopicActorImpl("") +{ +} + + + TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request) : TBase(request, request->GetProtoRequest()->path()) , TDescribeTopicActorImpl(request->GetProtoRequest()->consumer()) { } +TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::IRequestOpCtx * ctx) + : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->path()) + , TDescribeTopicActorImpl(dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->consumer()) +{ +} + + TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TString& consumer) : Consumer(consumer) { diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index e3834c298b..ec1182bd5c 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -117,6 +117,8 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo; public: TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request); + TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * ctx); + ~TDescribeTopicActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -142,6 +144,8 @@ using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo; public: TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request); + TDescribeConsumerActor(NKikimr::NGRpcService::IRequestOpCtx * ctx); + ~TDescribeConsumerActor() = default; void Bootstrap(const NActors::TActorContext& ctx); diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp index b116f03c01..e020ffd64b 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp @@ -141,36 +141,38 @@ void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest: } +namespace NKikimr { +namespace NGRpcService { -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } @@ -178,10 +180,29 @@ void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEv ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) { +void TGRpcRequestProxy::Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } + + +#ifdef DECLARE_RPC +#error DECLARE_RPC macro already defined +#endif + +#define DECLARE_RPC(name) template<> IActor* TEv##name##Request::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { \ + return new NKikimr::NGRpcProxy::V1::T##name##Actor(msg);\ + } + +DECLARE_RPC(DescribeTopic); +DECLARE_RPC(DescribeConsumer); + +#undef DECLARE_RPC + + + +} +}
\ No newline at end of file diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 77c91b8b98..c4d2620d22 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -2658,6 +2658,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { const TString& clientDc, const TString& originDc, const TString& client, const TString& consumerPath) { NJson::TJsonValue counters; + if (clientDc.empty() && originDc.empty()) { counters = GetClientCountersLegacy(monPort, "pqproxy", session, client, consumerPath); } else { |