diff options
author | Daniil Cherednik <dcherednik@ydb.tech> | 2024-01-29 13:54:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-29 13:54:43 +0100 |
commit | f883501d79b6d54d1ea61c7ac390d5f302250214 (patch) | |
tree | 4501544b1eb3544a78dbbc81ab2b017975488480 | |
parent | 04c3f1c13b10986ac522030e869d903a93992b36 (diff) | |
download | ydb-f883501d79b6d54d1ea61c7ac390d5f302250214.tar.gz |
Use runtime grpc event dispatching instead of legacy one for PQ scheme events (#1326)
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 7 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy.cpp | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_proxy_handle_methods.h | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 16 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls_topic.h | 24 | ||||
-rw-r--r-- | ydb/core/grpc_services/service_topic.h | 32 | ||||
-rw-r--r-- | ydb/core/viewer/json_local_rpc.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/events.h | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_schema.cpp | 231 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_schema.h | 78 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue.cpp | 33 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/services_initializer.cpp | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/services_initializer.h | 12 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/topic.cpp | 40 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/topic.h | 3 |
16 files changed, 269 insertions, 242 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 12272ce9af..344dbac4f8 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -1437,7 +1437,12 @@ public: void Pass(const IFacilityProvider& facility) override { this->Span_.End(); - PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility); + try { + PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility); + } catch (const std::exception& ex) { + this->RaiseIssue(NYql::TIssue{TStringBuilder() << "unexpected exception: " << ex.what()}); + this->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR); + } } TRateLimiterMode GetRlMode() const override { diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 3e44414f5d..ca2d67d498 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -606,20 +606,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) { HFunc(TEvStreamTopicDirectReadRequest, PreHandle); HFunc(TEvCommitOffsetRequest, PreHandle); HFunc(TEvPQReadInfoRequest, PreHandle); - HFunc(TEvPQDropTopicRequest, PreHandle); - HFunc(TEvPQCreateTopicRequest, PreHandle); - HFunc(TEvPQAlterTopicRequest, PreHandle); - HFunc(TEvPQAddReadRuleRequest, PreHandle); - HFunc(TEvPQRemoveReadRuleRequest, PreHandle); - HFunc(TEvPQDescribeTopicRequest, PreHandle); HFunc(TEvDiscoverPQClustersRequest, PreHandle); HFunc(TEvCoordinationSessionRequest, PreHandle); - HFunc(TEvDropTopicRequest, PreHandle); - HFunc(TEvCreateTopicRequest, PreHandle); - HFunc(TEvAlterTopicRequest, PreHandle); - HFunc(TEvDescribeTopicRequest, PreHandle); - HFunc(TEvDescribeConsumerRequest, PreHandle); - HFunc(TEvDescribePartitionRequest, PreHandle); HFunc(TEvNodeCheckRequest, PreHandle); HFunc(TEvProxyRuntimeEvent, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h index 155f344d7a..662d37f2d4 100644 --- a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h +++ b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h @@ -15,22 +15,10 @@ protected: static void Handle(TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvNodeCheckRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx); - static void Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx); }; } diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 0a02845f93..1f92eb4f6b 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -13,10 +13,8 @@ #include <ydb/public/api/protos/ydb_discovery.pb.h> #include <ydb/public/api/protos/ydb_monitoring.pb.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> -#include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h> #include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> -#include <ydb/public/api/protos/ydb_topic.pb.h> #include <ydb/public/api/protos/ydb_federation_discovery.pb.h> #include <ydb/public/api/grpc/draft/dummy.pb.h> @@ -69,21 +67,7 @@ using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvSt using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer, TRateLimiterMode::RuManual>; using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>; using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>; -using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>; -using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>; -using TEvPQAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAlterTopic, Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse, true>; -using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDescribeTopic, Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse, true>; -using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>; -using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>; - //TODO: Change this to runtime dispatching! -using TEvDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDropTopic, Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse, true, TRateLimiterMode::Rps>; -using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTopic, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse, true, TRateLimiterMode::Rps>; -using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>; -using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>; -using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>; -using TEvDescribePartitionRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribePartition, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse, true, TRateLimiterMode::Rps>; - using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>; diff --git a/ydb/core/grpc_services/rpc_calls_topic.h b/ydb/core/grpc_services/rpc_calls_topic.h new file mode 100644 index 0000000000..f595a3399b --- /dev/null +++ b/ydb/core/grpc_services/rpc_calls_topic.h @@ -0,0 +1,24 @@ +#pragma once + +#include <ydb/public/api/protos/ydb_topic.pb.h> +#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> + +#include "rpc_calls.h" + +namespace NKikimr::NGRpcService { + +using TEvDropTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse>; +using TEvCreateTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse>; +using TEvAlterTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse>; +using TEvDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse>; +using TEvDescribeConsumerRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse>; +using TEvDescribePartitionRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>; + +using TEvPQDropTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse>; +using TEvPQCreateTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse>; +using TEvPQAlterTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse>; +using TEvPQDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse>; +using TEvPQAddReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse>; +using TEvPQRemoveReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse>; + +} diff --git a/ydb/core/grpc_services/service_topic.h b/ydb/core/grpc_services/service_topic.h new file mode 100644 index 0000000000..a99316b03a --- /dev/null +++ b/ydb/core/grpc_services/service_topic.h @@ -0,0 +1,32 @@ +#pragma once + +#include <memory> + +namespace NKikimr { + +namespace NGRpcProxy::V1 { +class IClustersCfgProvider; +struct TClustersCfg; +} + +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>); +void DoAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoDescribeConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoDescribePartitionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); + +void DoPQDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoPQCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>); +void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>); +void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); +void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f); + +} +} diff --git a/ydb/core/viewer/json_local_rpc.h b/ydb/core/viewer/json_local_rpc.h index 13dc14ee88..6e3c54e4c5 100644 --- a/ydb/core/viewer/json_local_rpc.h +++ b/ydb/core/viewer/json_local_rpc.h @@ -10,7 +10,7 @@ #include "json_pipe_req.h" #include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/rpc_calls_topic.h> #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/public/sdk/cpp/client/ydb_types/status/status.h> diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index f147836b54..953b067ad0 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -3,7 +3,7 @@ #include "partition_id.h" #include <ydb/core/base/events.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/rpc_calls_topic.h> #include <ydb/core/protos/pqconfig.pb.h> #include <ydb/core/persqueue/key.h> #include <ydb/core/persqueue/percentile_counter.h> @@ -15,7 +15,6 @@ #include <util/generic/guid.h> - namespace NKikimr::NGRpcProxy::V1 { using namespace Ydb; diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp index 634a82be08..00e678183d 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp @@ -1,15 +1,12 @@ #include "grpc_pq_schema.h" #include "actors/schema_actors.h" +#include "actors/events.h" -#include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/tx/scheme_board/cache.h> -#include <ydb/core/ydb_convert/ydb_convert.h> - -#include <ydb/library/persqueue/obfuscate/obfuscate.h> -#include <ydb/library/persqueue/topic_parser/topic_parser.h> +#include <ydb/core/persqueue/cluster_tracker.h> #include <algorithm> +#include <shared_mutex> using namespace NActors; using namespace NKikimrClient; @@ -22,21 +19,43 @@ namespace NKikimr::NGRpcProxy::V1 { using namespace PersQueue::V1; +class TPQSchemaService : public NActors::TActorBootstrapped<TPQSchemaService>, IClustersCfgProvider { +public: + TPQSchemaService(IClustersCfgProvider** p); -IActor* CreatePQSchemaService(const TActorId& schemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { - return new TPQSchemaService(schemeCache, counters); -} + void Bootstrap(const TActorContext& ctx); + TIntrusiveConstPtr<TClustersCfg> GetCfg() const override; +private: + TString AvailableLocalCluster(); -TPQSchemaService::TPQSchemaService(const TActorId& schemeCache, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) - : SchemeCache(schemeCache) - , Counters(counters) - , LocalCluster("") -{ + STFUNC(StateFunc) { + switch (ev->GetTypeRewrite()) { + hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); + } + } + +private: + void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev); + + mutable std::shared_mutex Mtx; + TIntrusivePtr<TClustersCfg> ClustersCfg; +}; + +IActor* CreatePQSchemaService(IClustersCfgProvider** p) { + return new TPQSchemaService(p); } +TPQSchemaService::TPQSchemaService(IClustersCfgProvider** p) + : ClustersCfg(MakeIntrusive<TClustersCfg>()) +{ + // used from grpc handlers. + // GetCfg method in called in the grpc thread context + // We have guarantee this object is created before grpc start + // and the the object destroyed after grpc stop + *p = this; +} void TPQSchemaService::Bootstrap(const TActorContext& ctx) { if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { // ToDo[migration]: switch to haveClusters @@ -49,156 +68,164 @@ void TPQSchemaService::Bootstrap(const TActorContext& ctx) { Become(&TThis::StateFunc); } - void TPQSchemaService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev) { Y_ABORT_UNLESS(ev->Get()->ClustersList); Y_ABORT_UNLESS(ev->Get()->ClustersList->Clusters.size()); const auto& clusters = ev->Get()->ClustersList->Clusters; - LocalCluster = {}; + auto cfg = MakeIntrusive<TClustersCfg>(); auto it = std::find_if(begin(clusters), end(clusters), [](const auto& cluster) { return cluster.IsLocal; }); if (it != end(clusters)) { - LocalCluster = it->Name; + cfg->LocalCluster = it->Name; } - Clusters.resize(clusters.size()); + cfg->Clusters.resize(clusters.size()); for (size_t i = 0; i < clusters.size(); ++i) { - Clusters[i] = clusters[i].Name; + cfg->Clusters[i] = clusters[i].Name; } + + std::unique_lock lock(Mtx); + ClustersCfg = cfg; } -// unused ? -// google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> FillResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) { -// google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> res; -// FillIssue(res.Add(), code, errorReason); -// return res; -// } +TIntrusiveConstPtr<TClustersCfg> TPQSchemaService::GetCfg() const { + std::shared_lock lock(Mtx); + return ClustersCfg; +} +} -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) { +namespace NKikimr { +namespace NGRpcService { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new drop topic request"); +void EnsureReq(const IRequestOpCtx* ctx, const TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>& cfg) { + if (Y_UNLIKELY(!ctx)) + throw yexception() << "no req ctx after cast"; - ctx.Register(new TPQDropTopicActor(ev->Release().Release())); + if (Y_UNLIKELY(!cfg)) + throw yexception() << "no cluster cfg provided"; } +void EnsureReq(const IRequestOpCtx* ctx) { + if (Y_UNLIKELY(!ctx)) + throw yexception() << "no req ctx after cast"; +} -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) { +void DoDropTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) { + auto p = dynamic_cast<TEvDropTopicRequest*>(ctx.release()); - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new drop topic request"); + EnsureReq(p); - ctx.Register(new TDropTopicActor(ev->Release().Release())); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new drop topic request"); + f.RegisterActor(new NGRpcProxy::V1::TDropTopicActor(p)); } +void DoCreateTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f, + TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg> cfg) +{ + auto p = dynamic_cast<TEvCreateTopicRequest*>(ctx.release()); + EnsureReq(p); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Alter topic request"); - ctx.Register(new TPQAlterTopicActor(ev->Release().Release(), LocalCluster)); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new create topic request"); + f.RegisterActor(new NGRpcProxy::V1::TCreateTopicActor(p, cfg->LocalCluster, cfg->Clusters)); } -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Alter topic request"); - ctx.Register(new TAlterTopicActor(ev->Release().Release())); -} +void DoAlterTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) { + auto p = dynamic_cast<TEvAlterTopicRequest*>(ctx.release()); + EnsureReq(p); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Add read rules request"); - ctx.Register(new TAddReadRuleActor(ev->Release().Release())); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new alter topic request"); + f.RegisterActor(new NGRpcProxy::V1::TAlterTopicActor(p)); } -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Remove read rules request"); - ctx.Register(new TRemoveReadRuleActor(ev->Release().Release())); -} +void DoDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) { + auto p = dynamic_cast<TEvDescribeTopicRequest*>(ctx.release()); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new create topic request"); - ctx.Register(new TPQCreateTopicActor(ev->Release().Release(), LocalCluster, Clusters)); -} + EnsureReq(p); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new create topic request"); - ctx.Register(new TCreateTopicActor(ev->Release().Release(), LocalCluster, Clusters)); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe topic request"); + f.RegisterActor(new NGRpcProxy::V1::TDescribeTopicActor(p)); } +void DoDescribeConsumerRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) { + auto p = dynamic_cast<TEvDescribeConsumerRequest*>(ctx.release()); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe topic request"); - ctx.Register(new TPQDescribeTopicActor(ev->Release().Release())); -} + EnsureReq(p); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe topic request"); - ctx.Register(new TDescribeTopicActor(ev->Release().Release())); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe consumer request"); + f.RegisterActor(new NGRpcProxy::V1::TDescribeConsumerActor(p)); } -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe consumer request"); - ctx.Register(new TDescribeConsumerActor(ev->Release().Release())); -} +void DoDescribePartitionRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) { + auto p = dynamic_cast<TEvDescribePartitionRequest*>(ctx.release()); -void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe partition request"); - ctx.Register(new TDescribePartitionActor(ev->Release().Release())); -} + EnsureReq(p); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe partition request"); + f.RegisterActor(new NGRpcProxy::V1::TDescribePartitionActor(p)); } -namespace NKikimr { -namespace NGRpcService { +void DoPQDropTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) { + auto p = dynamic_cast<TEvPQDropTopicRequest*>(ctx.release()); -void TGRpcRequestProxyHandleMethods::Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} + EnsureReq(p); -void TGRpcRequestProxyHandleMethods::Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Drop topic request"); + f.RegisterActor(new NGRpcProxy::V1::TPQDropTopicActor(p)); } -void TGRpcRequestProxyHandleMethods::Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} +void DoPQCreateTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f, + TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg> cfg) +{ + auto p = dynamic_cast<TEvPQCreateTopicRequest*>(ctx.release()); -void TGRpcRequestProxyHandleMethods::Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} + EnsureReq(p, cfg); -void TGRpcRequestProxyHandleMethods::Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Create topic request"); + f.RegisterActor(new NGRpcProxy::V1::TPQCreateTopicActor(p, cfg->LocalCluster, cfg->Clusters)); } -void TGRpcRequestProxyHandleMethods::Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} +void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f, + TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg> cfg) +{ + auto p = dynamic_cast<TEvPQAlterTopicRequest*>(ctx.release()); -void TGRpcRequestProxyHandleMethods::Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} + EnsureReq(p, cfg); -void TGRpcRequestProxyHandleMethods::Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Alter topic request"); + f.RegisterActor(new NGRpcProxy::V1::TPQAlterTopicActor(p, cfg->LocalCluster)); } -void TGRpcRequestProxyHandleMethods::Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} +void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) { + auto p = dynamic_cast<TEvPQDescribeTopicRequest*>(ctx.release()); + + EnsureReq(p); -void NKikimr::NGRpcService::TGRpcRequestProxyHandleMethods::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe topic request"); + f.RegisterActor(new NGRpcProxy::V1::TPQDescribeTopicActor(p)); } -void TGRpcRequestProxyHandleMethods::Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); -} +void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) { + auto p = dynamic_cast<TEvPQAddReadRuleRequest*>(ctx.release()); + + EnsureReq(p); -void TGRpcRequestProxyHandleMethods::Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Add read rules request"); + f.RegisterActor(new NGRpcProxy::V1::TAddReadRuleActor(p)); } +void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) { + auto p = dynamic_cast<TEvPQRemoveReadRuleRequest*>(ctx.release()); + + EnsureReq(p); + + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Remove read rules request"); + f.RegisterActor(new NGRpcProxy::V1::TRemoveReadRuleActor(p)); +} #ifdef DECLARE_RPC #error DECLARE_RPC macro already defined @@ -214,7 +241,5 @@ DECLARE_RPC(DescribePartition); #undef DECLARE_RPC - - } } diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h index 02b1987179..27df9d36fd 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.h +++ b/ydb/services/persqueue_v1/grpc_pq_schema.h @@ -1,74 +1,28 @@ #pragma once -#include "actors/events.h" - -#include <ydb/core/client/server/grpc_base.h> -#include <ydb/core/persqueue/cluster_tracker.h> - -#include <ydb/library/actors/core/actor_bootstrapped.h> -#include <ydb/library/actors/core/actorsystem.h> - -#include <util/generic/hash.h> -#include <util/system/mutex.h> - +#include <ydb/library/actors/core/actorid.h> +#include <ydb/library/actors/core/actor.h> +#include <util/generic/ptr.h> namespace NKikimr::NGRpcProxy::V1 { -inline TActorId GetPQSchemaServiceActorID() { - return TActorId(0, "PQSchmSvc"); +inline NActors::TActorId GetPQSchemaServiceActorID() { + return NActors::TActorId(0, "PQSchmSvc"); } -IActor* CreatePQSchemaService(const NActors::TActorId& schemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters); +struct TClustersCfg : public TThrRefBase { + TClustersCfg() + : LocalCluster("") + {} + TString LocalCluster; + TVector<TString> Clusters; +}; -class TPQSchemaService : public NActors::TActorBootstrapped<TPQSchemaService> { +class IClustersCfgProvider { public: - TPQSchemaService(const NActors::TActorId& schemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters); - - void Bootstrap(const TActorContext& ctx); - -private: - TString AvailableLocalCluster(); - - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - HFunc(NKikimr::NGRpcService::TEvPQDropTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvPQCreateTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvPQAlterTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvPQAddReadRuleRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvPQDescribeTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDropTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvCreateTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvAlterTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDescribeTopicRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDescribeConsumerRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDescribePartitionRequest, Handle); - hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); - } - } - -private: - void Handle(NKikimr::NGRpcService::TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx); - - void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev); - - NActors::TActorId SchemeCache; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; - - TVector<TString> Clusters; - TString LocalCluster; + virtual ~IClustersCfgProvider() = default; + virtual TIntrusiveConstPtr<TClustersCfg> GetCfg() const = 0; }; +NActors::IActor* CreatePQSchemaService(IClustersCfgProvider** cfgProvider); } diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp index b0a3360d53..d545d2e73e 100644 --- a/ydb/services/persqueue_v1/persqueue.cpp +++ b/ydb/services/persqueue_v1/persqueue.cpp @@ -2,7 +2,8 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/rpc_calls_topic.h> +#include <ydb/core/grpc_services/service_topic.h> #include <ydb/core/grpc_services/grpc_helper.h> #include <ydb/core/tx/scheme_board/cache.h> @@ -23,7 +24,7 @@ TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TInt void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrpc::TLoggerPtr logger) { CQ_ = cq; - ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute(); + ServicesInitializer(ActorSystem_, SchemeCache, Counters_, &ClustersCfgProvider).Execute(); if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { SetupIncomingRequests(std::move(logger)); @@ -31,7 +32,7 @@ void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp } void TGRpcPersQueueService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { - + using namespace std::placeholders; auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_); { @@ -90,23 +91,29 @@ void TGRpcPersQueueService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { }) ADD_REQUEST(DropTopic, PersQueueService, DropTopicRequest, DropTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDropTopicRequest(ctx)); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQDropTopicRequest(ctx, &DoPQDropTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) ADD_REQUEST(CreateTopic, PersQueueService, CreateTopicRequest, CreateTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQCreateTopicRequest(ctx)); - }) + auto clusterCfg = ClustersCfgProvider->GetCfg(); + ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQCreateTopicRequest(ctx, std::bind(DoPQCreateTopicRequest, _1, _2, clusterCfg), TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(AlterTopic, PersQueueService, AlterTopicRequest, AlterTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAlterTopicRequest(ctx)); - }) + auto clusterCfg = ClustersCfgProvider->GetCfg(); + ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQAlterTopicRequest(ctx, std::bind(DoPQAlterTopicRequest, _1, _2, clusterCfg), TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(DescribeTopic, PersQueueService, DescribeTopicRequest, DescribeTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDescribeTopicRequest(ctx)); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQDescribeTopicRequest(ctx, &DoPQDescribeTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(AddReadRule, PersQueueService, AddReadRuleRequest, AddReadRuleResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAddReadRuleRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQAddReadRuleRequest(ctx, &DoPQAddReadRuleRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); }) + ADD_REQUEST(RemoveReadRule, PersQueueService, RemoveReadRuleRequest, RemoveReadRuleResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQRemoveReadRuleRequest(ctx)); + ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQRemoveReadRuleRequest(ctx, &DoPQRemoveReadRuleRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); }) #undef ADD_REQUEST diff --git a/ydb/services/persqueue_v1/persqueue.h b/ydb/services/persqueue_v1/persqueue.h index 348be0f72e..efb6804609 100644 --- a/ydb/services/persqueue_v1/persqueue.h +++ b/ydb/services/persqueue_v1/persqueue.h @@ -6,6 +6,7 @@ #include <ydb/library/grpc/server/grpc_server.h> #include <ydb/core/grpc_services/base/base_service.h> +#include <ydb/core/grpc_services/service_topic.h> namespace NKikimr { @@ -28,6 +29,7 @@ private: void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) override; NActors::TActorId SchemeCache; + NKikimr::NGRpcProxy::V1::IClustersCfgProvider* ClustersCfgProvider = nullptr; }; } // namespace V1 diff --git a/ydb/services/persqueue_v1/services_initializer.cpp b/ydb/services/persqueue_v1/services_initializer.cpp index afb18f384b..2a80ee6b00 100644 --- a/ydb/services/persqueue_v1/services_initializer.cpp +++ b/ydb/services/persqueue_v1/services_initializer.cpp @@ -58,10 +58,12 @@ void ServicesInitializer::InitReadService() { void ServicesInitializer::InitSchemaService() { auto serviceId = NGRpcProxy::V1::GetPQSchemaServiceActorID(); + static NGRpcProxy::V1::IClustersCfgProvider* providerService; if (RequiresServiceRegistration(serviceId)) { - auto service = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters); + auto service = NGRpcProxy::V1::CreatePQSchemaService(&providerService); RegisterService(serviceId, service); } + *ClusterCfgProvider = providerService; } } // namespace V1 diff --git a/ydb/services/persqueue_v1/services_initializer.h b/ydb/services/persqueue_v1/services_initializer.h index 1d604f35a9..938d1d8d40 100644 --- a/ydb/services/persqueue_v1/services_initializer.h +++ b/ydb/services/persqueue_v1/services_initializer.h @@ -3,6 +3,11 @@ #include <ydb/library/actors/core/actorsystem.h> namespace NKikimr { + +namespace NGRpcProxy::V1 { +class IClustersCfgProvider; +} + namespace NGRpcService { namespace V1 { @@ -10,10 +15,12 @@ class ServicesInitializer { public: ServicesInitializer(NActors::TActorSystem* actorSystem, NActors::TActorId schemeCache, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + NGRpcProxy::V1::IClustersCfgProvider** p) : ActorSystem(actorSystem) , SchemeCache(schemeCache) - , Counters(counters) { + , Counters(counters) + , ClusterCfgProvider(p) { } void Execute(); @@ -31,6 +38,7 @@ private: NActors::TActorSystem* ActorSystem; NActors::TActorId SchemeCache; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + NGRpcProxy::V1::IClustersCfgProvider** ClusterCfgProvider; }; diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index 40a7f2c65a..a1a697c651 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -2,9 +2,10 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> -#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/rpc_calls_topic.h> #include <ydb/core/grpc_services/grpc_helper.h> #include <ydb/core/grpc_services/service_table.h> +#include <ydb/core/grpc_services/service_topic.h> #include <ydb/core/tx/scheme_board/cache.h> #include "actors/update_offsets_in_transaction_actor.h" @@ -29,7 +30,7 @@ TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrpc::TLoggerPtr logger) { CQ_ = cq; - ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute(); + ServicesInitializer(ActorSystem_, SchemeCache, Counters_, &ClustersCfgProvider).Execute(); if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) { SetupIncomingRequests(std::move(logger)); @@ -43,6 +44,7 @@ void TGRpcTopicService::DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpC void TGRpcTopicService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { + using namespace std::placeholders; auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_); { @@ -118,27 +120,33 @@ void TGRpcTopicService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { "TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run(); ADD_REQUEST(CommitOffset, TopicService, CommitOffsetRequest, CommitOffsetResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCommitOffsetRequest(ctx)); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCommitOffsetRequest(ctx)); + }) ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDropTopicRequest(ctx, IsRlAllowed())); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvDropTopicRequest(ctx, &DoDropTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(CreateTopic, TopicService, CreateTopicRequest, CreateTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCreateTopicRequest(ctx, IsRlAllowed())); - }) + auto clusterCfg = ClustersCfgProvider->GetCfg(); + ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateTopicRequest(ctx, std::bind(DoCreateTopicRequest, _1, _2, clusterCfg), TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(AlterTopic, TopicService, AlterTopicRequest, AlterTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvAlterTopicRequest(ctx, IsRlAllowed())); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvAlterTopicRequest(ctx, &DoAlterTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeTopicRequest(ctx, IsRlAllowed())); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTopicRequest(ctx, &DoDescribeTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(DescribeConsumer, TopicService, DescribeConsumerRequest, DescribeConsumerResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed())); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeConsumerRequest(ctx, &DoDescribeConsumerRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) + ADD_REQUEST(DescribePartition, TopicService, DescribePartitionRequest, DescribePartitionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribePartitionRequest(ctx, IsRlAllowed())); - }) + ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribePartitionRequest(ctx, &DoDescribePartitionRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off})); + }) #undef ADD_REQUEST #ifdef ADD_REQUEST_LIMIT diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h index d33f55ccf1..f2aef10454 100644 --- a/ydb/services/persqueue_v1/topic.h +++ b/ydb/services/persqueue_v1/topic.h @@ -8,7 +8,7 @@ #include <ydb/core/grpc_services/base/base_service.h> #include <ydb/core/grpc_services/base/base.h> - +#include <ydb/core/grpc_services/service_topic.h> namespace NKikimr::NGRpcService::V1 { @@ -32,6 +32,7 @@ private: NActors::TActorId SchemeCache; TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + NKikimr::NGRpcProxy::V1::IClustersCfgProvider* ClustersCfgProvider = nullptr; }; } // namespace NKikimr::NGRpcService::V1 |