aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2024-01-29 13:54:43 +0100
committerGitHub <noreply@github.com>2024-01-29 13:54:43 +0100
commitf883501d79b6d54d1ea61c7ac390d5f302250214 (patch)
tree4501544b1eb3544a78dbbc81ab2b017975488480
parent04c3f1c13b10986ac522030e869d903a93992b36 (diff)
downloadydb-f883501d79b6d54d1ea61c7ac390d5f302250214.tar.gz
Use runtime grpc event dispatching instead of legacy one for PQ scheme events (#1326)
-rw-r--r--ydb/core/grpc_services/base/base.h7
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp12
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy_handle_methods.h12
-rw-r--r--ydb/core/grpc_services/rpc_calls.h16
-rw-r--r--ydb/core/grpc_services/rpc_calls_topic.h24
-rw-r--r--ydb/core/grpc_services/service_topic.h32
-rw-r--r--ydb/core/viewer/json_local_rpc.h2
-rw-r--r--ydb/services/persqueue_v1/actors/events.h3
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.cpp231
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.h78
-rw-r--r--ydb/services/persqueue_v1/persqueue.cpp33
-rw-r--r--ydb/services/persqueue_v1/persqueue.h2
-rw-r--r--ydb/services/persqueue_v1/services_initializer.cpp4
-rw-r--r--ydb/services/persqueue_v1/services_initializer.h12
-rw-r--r--ydb/services/persqueue_v1/topic.cpp40
-rw-r--r--ydb/services/persqueue_v1/topic.h3
16 files changed, 269 insertions, 242 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 12272ce9af..344dbac4f8 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -1437,7 +1437,12 @@ public:
void Pass(const IFacilityProvider& facility) override {
this->Span_.End();
- PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
+ try {
+ PassMethod(std::move(std::unique_ptr<TRequestIface>(this)), facility);
+ } catch (const std::exception& ex) {
+ this->RaiseIssue(NYql::TIssue{TStringBuilder() << "unexpected exception: " << ex.what()});
+ this->ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR);
+ }
}
TRateLimiterMode GetRlMode() const override {
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 3e44414f5d..ca2d67d498 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -606,20 +606,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
HFunc(TEvStreamTopicDirectReadRequest, PreHandle);
HFunc(TEvCommitOffsetRequest, PreHandle);
HFunc(TEvPQReadInfoRequest, PreHandle);
- HFunc(TEvPQDropTopicRequest, PreHandle);
- HFunc(TEvPQCreateTopicRequest, PreHandle);
- HFunc(TEvPQAlterTopicRequest, PreHandle);
- HFunc(TEvPQAddReadRuleRequest, PreHandle);
- HFunc(TEvPQRemoveReadRuleRequest, PreHandle);
- HFunc(TEvPQDescribeTopicRequest, PreHandle);
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
HFunc(TEvCoordinationSessionRequest, PreHandle);
- HFunc(TEvDropTopicRequest, PreHandle);
- HFunc(TEvCreateTopicRequest, PreHandle);
- HFunc(TEvAlterTopicRequest, PreHandle);
- HFunc(TEvDescribeTopicRequest, PreHandle);
- HFunc(TEvDescribeConsumerRequest, PreHandle);
- HFunc(TEvDescribePartitionRequest, PreHandle);
HFunc(TEvNodeCheckRequest, PreHandle);
HFunc(TEvProxyRuntimeEvent, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
index 155f344d7a..662d37f2d4 100644
--- a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
+++ b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
@@ -15,22 +15,10 @@ protected:
static void Handle(TEvStreamTopicDirectReadRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvCommitOffsetRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvPQReadInfoRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvNodeCheckRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
- static void Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx);
};
}
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 0a02845f93..1f92eb4f6b 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -13,10 +13,8 @@
#include <ydb/public/api/protos/ydb_discovery.pb.h>
#include <ydb/public/api/protos/ydb_monitoring.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
-#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
-#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
#include <ydb/public/api/grpc/draft/dummy.pb.h>
@@ -69,21 +67,7 @@ using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvSt
using TEvStreamTopicDirectReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicDirectRead, Ydb::Topic::StreamDirectReadMessage::FromClient, Ydb::Topic::StreamDirectReadMessage::FromServer, TRateLimiterMode::RuManual>;
using TEvCommitOffsetRequest = TGRpcRequestWrapper<TRpcServices::EvTopicCommitOffset, Ydb::Topic::CommitOffsetRequest, Ydb::Topic::CommitOffsetResponse, true>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
-using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
-using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>;
-using TEvPQAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAlterTopic, Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse, true>;
-using TEvPQDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDescribeTopic, Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse, true>;
-using TEvPQAddReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQAddReadRule, Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse, true>;
-using TEvPQRemoveReadRuleRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQRemoveReadRule, Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse, true>;
-
//TODO: Change this to runtime dispatching!
-using TEvDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDropTopic, Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse, true, TRateLimiterMode::Rps>;
-using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTopic, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse, true, TRateLimiterMode::Rps>;
-using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>;
-using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>;
-using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>;
-using TEvDescribePartitionRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribePartition, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse, true, TRateLimiterMode::Rps>;
-
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>;
diff --git a/ydb/core/grpc_services/rpc_calls_topic.h b/ydb/core/grpc_services/rpc_calls_topic.h
new file mode 100644
index 0000000000..f595a3399b
--- /dev/null
+++ b/ydb/core/grpc_services/rpc_calls_topic.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <ydb/public/api/protos/ydb_topic.pb.h>
+#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
+
+#include "rpc_calls.h"
+
+namespace NKikimr::NGRpcService {
+
+using TEvDropTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DropTopicRequest, Ydb::Topic::DropTopicResponse>;
+using TEvCreateTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse>;
+using TEvAlterTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse>;
+using TEvDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse>;
+using TEvDescribeConsumerRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse>;
+using TEvDescribePartitionRequest = TGrpcRequestOperationCall<Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>;
+
+using TEvPQDropTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse>;
+using TEvPQCreateTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse>;
+using TEvPQAlterTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AlterTopicRequest, Ydb::PersQueue::V1::AlterTopicResponse>;
+using TEvPQDescribeTopicRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::DescribeTopicRequest, Ydb::PersQueue::V1::DescribeTopicResponse>;
+using TEvPQAddReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::AddReadRuleRequest, Ydb::PersQueue::V1::AddReadRuleResponse>;
+using TEvPQRemoveReadRuleRequest = TGrpcRequestOperationCall<Ydb::PersQueue::V1::RemoveReadRuleRequest, Ydb::PersQueue::V1::RemoveReadRuleResponse>;
+
+}
diff --git a/ydb/core/grpc_services/service_topic.h b/ydb/core/grpc_services/service_topic.h
new file mode 100644
index 0000000000..a99316b03a
--- /dev/null
+++ b/ydb/core/grpc_services/service_topic.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr {
+
+namespace NGRpcProxy::V1 {
+class IClustersCfgProvider;
+struct TClustersCfg;
+}
+
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
+void DoAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoDescribeConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoDescribePartitionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+
+void DoPQDropTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoPQCreateTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
+void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f, TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>);
+void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
+
+}
+}
diff --git a/ydb/core/viewer/json_local_rpc.h b/ydb/core/viewer/json_local_rpc.h
index 13dc14ee88..6e3c54e4c5 100644
--- a/ydb/core/viewer/json_local_rpc.h
+++ b/ydb/core/viewer/json_local_rpc.h
@@ -10,7 +10,7 @@
#include "json_pipe_req.h"
#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/rpc_calls_topic.h>
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index f147836b54..953b067ad0 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -3,7 +3,7 @@
#include "partition_id.h"
#include <ydb/core/base/events.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/rpc_calls_topic.h>
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/percentile_counter.h>
@@ -15,7 +15,6 @@
#include <util/generic/guid.h>
-
namespace NKikimr::NGRpcProxy::V1 {
using namespace Ydb;
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
index 634a82be08..00e678183d 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
@@ -1,15 +1,12 @@
#include "grpc_pq_schema.h"
#include "actors/schema_actors.h"
+#include "actors/events.h"
-#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/tx/scheme_board/cache.h>
-#include <ydb/core/ydb_convert/ydb_convert.h>
-
-#include <ydb/library/persqueue/obfuscate/obfuscate.h>
-#include <ydb/library/persqueue/topic_parser/topic_parser.h>
+#include <ydb/core/persqueue/cluster_tracker.h>
#include <algorithm>
+#include <shared_mutex>
using namespace NActors;
using namespace NKikimrClient;
@@ -22,21 +19,43 @@ namespace NKikimr::NGRpcProxy::V1 {
using namespace PersQueue::V1;
+class TPQSchemaService : public NActors::TActorBootstrapped<TPQSchemaService>, IClustersCfgProvider {
+public:
+ TPQSchemaService(IClustersCfgProvider** p);
-IActor* CreatePQSchemaService(const TActorId& schemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
- return new TPQSchemaService(schemeCache, counters);
-}
+ void Bootstrap(const TActorContext& ctx);
+ TIntrusiveConstPtr<TClustersCfg> GetCfg() const override;
+private:
+ TString AvailableLocalCluster();
-TPQSchemaService::TPQSchemaService(const TActorId& schemeCache,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
- : SchemeCache(schemeCache)
- , Counters(counters)
- , LocalCluster("")
-{
+ STFUNC(StateFunc) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
+ }
+ }
+
+private:
+ void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev);
+
+ mutable std::shared_mutex Mtx;
+ TIntrusivePtr<TClustersCfg> ClustersCfg;
+};
+
+IActor* CreatePQSchemaService(IClustersCfgProvider** p) {
+ return new TPQSchemaService(p);
}
+TPQSchemaService::TPQSchemaService(IClustersCfgProvider** p)
+ : ClustersCfg(MakeIntrusive<TClustersCfg>())
+{
+ // used from grpc handlers.
+ // GetCfg method in called in the grpc thread context
+ // We have guarantee this object is created before grpc start
+ // and the the object destroyed after grpc stop
+ *p = this;
+}
void TPQSchemaService::Bootstrap(const TActorContext& ctx) {
if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { // ToDo[migration]: switch to haveClusters
@@ -49,156 +68,164 @@ void TPQSchemaService::Bootstrap(const TActorContext& ctx) {
Become(&TThis::StateFunc);
}
-
void TPQSchemaService::Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev) {
Y_ABORT_UNLESS(ev->Get()->ClustersList);
Y_ABORT_UNLESS(ev->Get()->ClustersList->Clusters.size());
const auto& clusters = ev->Get()->ClustersList->Clusters;
- LocalCluster = {};
+ auto cfg = MakeIntrusive<TClustersCfg>();
auto it = std::find_if(begin(clusters), end(clusters), [](const auto& cluster) { return cluster.IsLocal; });
if (it != end(clusters)) {
- LocalCluster = it->Name;
+ cfg->LocalCluster = it->Name;
}
- Clusters.resize(clusters.size());
+ cfg->Clusters.resize(clusters.size());
for (size_t i = 0; i < clusters.size(); ++i) {
- Clusters[i] = clusters[i].Name;
+ cfg->Clusters[i] = clusters[i].Name;
}
+
+ std::unique_lock lock(Mtx);
+ ClustersCfg = cfg;
}
-// unused ?
-// google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> FillResponse(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code) {
-// google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> res;
-// FillIssue(res.Add(), code, errorReason);
-// return res;
-// }
+TIntrusiveConstPtr<TClustersCfg> TPQSchemaService::GetCfg() const {
+ std::shared_lock lock(Mtx);
+ return ClustersCfg;
+}
+}
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
+namespace NKikimr {
+namespace NGRpcService {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new drop topic request");
+void EnsureReq(const IRequestOpCtx* ctx, const TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg>& cfg) {
+ if (Y_UNLIKELY(!ctx))
+ throw yexception() << "no req ctx after cast";
- ctx.Register(new TPQDropTopicActor(ev->Release().Release()));
+ if (Y_UNLIKELY(!cfg))
+ throw yexception() << "no cluster cfg provided";
}
+void EnsureReq(const IRequestOpCtx* ctx) {
+ if (Y_UNLIKELY(!ctx))
+ throw yexception() << "no req ctx after cast";
+}
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
+void DoDropTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvDropTopicRequest*>(ctx.release());
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new drop topic request");
+ EnsureReq(p);
- ctx.Register(new TDropTopicActor(ev->Release().Release()));
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new drop topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TDropTopicActor(p));
}
+void DoCreateTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f,
+ TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg> cfg)
+{
+ auto p = dynamic_cast<TEvCreateTopicRequest*>(ctx.release());
+ EnsureReq(p);
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Alter topic request");
- ctx.Register(new TPQAlterTopicActor(ev->Release().Release(), LocalCluster));
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new create topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TCreateTopicActor(p, cfg->LocalCluster, cfg->Clusters));
}
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Alter topic request");
- ctx.Register(new TAlterTopicActor(ev->Release().Release()));
-}
+void DoAlterTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvAlterTopicRequest*>(ctx.release());
+ EnsureReq(p);
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Add read rules request");
- ctx.Register(new TAddReadRuleActor(ev->Release().Release()));
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new alter topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TAlterTopicActor(p));
}
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Remove read rules request");
- ctx.Register(new TRemoveReadRuleActor(ev->Release().Release()));
-}
+void DoDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvDescribeTopicRequest*>(ctx.release());
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new create topic request");
- ctx.Register(new TPQCreateTopicActor(ev->Release().Release(), LocalCluster, Clusters));
-}
+ EnsureReq(p);
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new create topic request");
- ctx.Register(new TCreateTopicActor(ev->Release().Release(), LocalCluster, Clusters));
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TDescribeTopicActor(p));
}
+void DoDescribeConsumerRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvDescribeConsumerRequest*>(ctx.release());
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe topic request");
- ctx.Register(new TPQDescribeTopicActor(ev->Release().Release()));
-}
+ EnsureReq(p);
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe topic request");
- ctx.Register(new TDescribeTopicActor(ev->Release().Release()));
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe consumer request");
+ f.RegisterActor(new NGRpcProxy::V1::TDescribeConsumerActor(p));
}
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe consumer request");
- ctx.Register(new TDescribeConsumerActor(ev->Release().Release()));
-}
+void DoDescribePartitionRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvDescribePartitionRequest*>(ctx.release());
-void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe partition request");
- ctx.Register(new TDescribePartitionActor(ev->Release().Release()));
-}
+ EnsureReq(p);
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe partition request");
+ f.RegisterActor(new NGRpcProxy::V1::TDescribePartitionActor(p));
}
-namespace NKikimr {
-namespace NGRpcService {
+void DoPQDropTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const NKikimr::NGRpcService::IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvPQDropTopicRequest*>(ctx.release());
-void TGRpcRequestProxyHandleMethods::Handle(TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+ EnsureReq(p);
-void TGRpcRequestProxyHandleMethods::Handle(TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Drop topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TPQDropTopicActor(p));
}
-void TGRpcRequestProxyHandleMethods::Handle(TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+void DoPQCreateTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f,
+ TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg> cfg)
+{
+ auto p = dynamic_cast<TEvPQCreateTopicRequest*>(ctx.release());
-void TGRpcRequestProxyHandleMethods::Handle(TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+ EnsureReq(p, cfg);
-void TGRpcRequestProxyHandleMethods::Handle(TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Create topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TPQCreateTopicActor(p, cfg->LocalCluster, cfg->Clusters));
}
-void TGRpcRequestProxyHandleMethods::Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+void DoPQAlterTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f,
+ TIntrusiveConstPtr<NGRpcProxy::V1::TClustersCfg> cfg)
+{
+ auto p = dynamic_cast<TEvPQAlterTopicRequest*>(ctx.release());
-void TGRpcRequestProxyHandleMethods::Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+ EnsureReq(p, cfg);
-void TGRpcRequestProxyHandleMethods::Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Alter topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TPQAlterTopicActor(p, cfg->LocalCluster));
}
-void TGRpcRequestProxyHandleMethods::Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+void DoPQDescribeTopicRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvPQDescribeTopicRequest*>(ctx.release());
+
+ EnsureReq(p);
-void NKikimr::NGRpcService::TGRpcRequestProxyHandleMethods::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Describe topic request");
+ f.RegisterActor(new NGRpcProxy::V1::TPQDescribeTopicActor(p));
}
-void TGRpcRequestProxyHandleMethods::Handle(TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
-}
+void DoPQAddReadRuleRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvPQAddReadRuleRequest*>(ctx.release());
+
+ EnsureReq(p);
-void TGRpcRequestProxyHandleMethods::Handle(TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Add read rules request");
+ f.RegisterActor(new NGRpcProxy::V1::TAddReadRuleActor(p));
}
+void DoPQRemoveReadRuleRequest(std::unique_ptr<IRequestOpCtx> ctx, const IFacilityProvider& f) {
+ auto p = dynamic_cast<TEvPQRemoveReadRuleRequest*>(ctx.release());
+
+ EnsureReq(p);
+
+ LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PQ_READ_PROXY, "new Remove read rules request");
+ f.RegisterActor(new NGRpcProxy::V1::TRemoveReadRuleActor(p));
+}
#ifdef DECLARE_RPC
#error DECLARE_RPC macro already defined
@@ -214,7 +241,5 @@ DECLARE_RPC(DescribePartition);
#undef DECLARE_RPC
-
-
}
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h
index 02b1987179..27df9d36fd 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.h
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.h
@@ -1,74 +1,28 @@
#pragma once
-#include "actors/events.h"
-
-#include <ydb/core/client/server/grpc_base.h>
-#include <ydb/core/persqueue/cluster_tracker.h>
-
-#include <ydb/library/actors/core/actor_bootstrapped.h>
-#include <ydb/library/actors/core/actorsystem.h>
-
-#include <util/generic/hash.h>
-#include <util/system/mutex.h>
-
+#include <ydb/library/actors/core/actorid.h>
+#include <ydb/library/actors/core/actor.h>
+#include <util/generic/ptr.h>
namespace NKikimr::NGRpcProxy::V1 {
-inline TActorId GetPQSchemaServiceActorID() {
- return TActorId(0, "PQSchmSvc");
+inline NActors::TActorId GetPQSchemaServiceActorID() {
+ return NActors::TActorId(0, "PQSchmSvc");
}
-IActor* CreatePQSchemaService(const NActors::TActorId& schemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
+struct TClustersCfg : public TThrRefBase {
+ TClustersCfg()
+ : LocalCluster("")
+ {}
+ TString LocalCluster;
+ TVector<TString> Clusters;
+};
-class TPQSchemaService : public NActors::TActorBootstrapped<TPQSchemaService> {
+class IClustersCfgProvider {
public:
- TPQSchemaService(const NActors::TActorId& schemeCache, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
-
- void Bootstrap(const TActorContext& ctx);
-
-private:
- TString AvailableLocalCluster();
-
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NKikimr::NGRpcService::TEvPQDropTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvPQCreateTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvPQAlterTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvPQAddReadRuleRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvPQDescribeTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDropTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvCreateTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvAlterTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDescribeTopicRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDescribeConsumerRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDescribePartitionRequest, Handle);
- hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
- }
- }
-
-private:
- void Handle(NKikimr::NGRpcService::TEvPQDropTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvPQCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvPQAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvPQDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDropTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx);
-
- void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev);
-
- NActors::TActorId SchemeCache;
-
- TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
-
- TVector<TString> Clusters;
- TString LocalCluster;
+ virtual ~IClustersCfgProvider() = default;
+ virtual TIntrusiveConstPtr<TClustersCfg> GetCfg() const = 0;
};
+NActors::IActor* CreatePQSchemaService(IClustersCfgProvider** cfgProvider);
}
diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp
index b0a3360d53..d545d2e73e 100644
--- a/ydb/services/persqueue_v1/persqueue.cpp
+++ b/ydb/services/persqueue_v1/persqueue.cpp
@@ -2,7 +2,8 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/rpc_calls_topic.h>
+#include <ydb/core/grpc_services/service_topic.h>
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/tx/scheme_board/cache.h>
@@ -23,7 +24,7 @@ TGRpcPersQueueService::TGRpcPersQueueService(NActors::TActorSystem *system, TInt
void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrpc::TLoggerPtr logger) {
CQ_ = cq;
- ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute();
+ ServicesInitializer(ActorSystem_, SchemeCache, Counters_, &ClustersCfgProvider).Execute();
if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
SetupIncomingRequests(std::move(logger));
@@ -31,7 +32,7 @@ void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrp
}
void TGRpcPersQueueService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
-
+ using namespace std::placeholders;
auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
{
@@ -90,23 +91,29 @@ void TGRpcPersQueueService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
})
ADD_REQUEST(DropTopic, PersQueueService, DropTopicRequest, DropTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDropTopicRequest(ctx));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQDropTopicRequest(ctx, &DoPQDropTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
ADD_REQUEST(CreateTopic, PersQueueService, CreateTopicRequest, CreateTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQCreateTopicRequest(ctx));
- })
+ auto clusterCfg = ClustersCfgProvider->GetCfg();
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQCreateTopicRequest(ctx, std::bind(DoPQCreateTopicRequest, _1, _2, clusterCfg), TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(AlterTopic, PersQueueService, AlterTopicRequest, AlterTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAlterTopicRequest(ctx));
- })
+ auto clusterCfg = ClustersCfgProvider->GetCfg();
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQAlterTopicRequest(ctx, std::bind(DoPQAlterTopicRequest, _1, _2, clusterCfg), TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(DescribeTopic, PersQueueService, DescribeTopicRequest, DescribeTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQDescribeTopicRequest(ctx));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQDescribeTopicRequest(ctx, &DoPQDescribeTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(AddReadRule, PersQueueService, AddReadRuleRequest, AddReadRuleResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQAddReadRuleRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQAddReadRuleRequest(ctx, &DoPQAddReadRuleRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
})
+
ADD_REQUEST(RemoveReadRule, PersQueueService, RemoveReadRuleRequest, RemoveReadRuleResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvPQRemoveReadRuleRequest(ctx));
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvPQRemoveReadRuleRequest(ctx, &DoPQRemoveReadRuleRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
})
#undef ADD_REQUEST
diff --git a/ydb/services/persqueue_v1/persqueue.h b/ydb/services/persqueue_v1/persqueue.h
index 348be0f72e..efb6804609 100644
--- a/ydb/services/persqueue_v1/persqueue.h
+++ b/ydb/services/persqueue_v1/persqueue.h
@@ -6,6 +6,7 @@
#include <ydb/library/grpc/server/grpc_server.h>
#include <ydb/core/grpc_services/base/base_service.h>
+#include <ydb/core/grpc_services/service_topic.h>
namespace NKikimr {
@@ -28,6 +29,7 @@ private:
void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) override;
NActors::TActorId SchemeCache;
+ NKikimr::NGRpcProxy::V1::IClustersCfgProvider* ClustersCfgProvider = nullptr;
};
} // namespace V1
diff --git a/ydb/services/persqueue_v1/services_initializer.cpp b/ydb/services/persqueue_v1/services_initializer.cpp
index afb18f384b..2a80ee6b00 100644
--- a/ydb/services/persqueue_v1/services_initializer.cpp
+++ b/ydb/services/persqueue_v1/services_initializer.cpp
@@ -58,10 +58,12 @@ void ServicesInitializer::InitReadService() {
void ServicesInitializer::InitSchemaService() {
auto serviceId = NGRpcProxy::V1::GetPQSchemaServiceActorID();
+ static NGRpcProxy::V1::IClustersCfgProvider* providerService;
if (RequiresServiceRegistration(serviceId)) {
- auto service = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters);
+ auto service = NGRpcProxy::V1::CreatePQSchemaService(&providerService);
RegisterService(serviceId, service);
}
+ *ClusterCfgProvider = providerService;
}
} // namespace V1
diff --git a/ydb/services/persqueue_v1/services_initializer.h b/ydb/services/persqueue_v1/services_initializer.h
index 1d604f35a9..938d1d8d40 100644
--- a/ydb/services/persqueue_v1/services_initializer.h
+++ b/ydb/services/persqueue_v1/services_initializer.h
@@ -3,6 +3,11 @@
#include <ydb/library/actors/core/actorsystem.h>
namespace NKikimr {
+
+namespace NGRpcProxy::V1 {
+class IClustersCfgProvider;
+}
+
namespace NGRpcService {
namespace V1 {
@@ -10,10 +15,12 @@ class ServicesInitializer {
public:
ServicesInitializer(NActors::TActorSystem* actorSystem,
NActors::TActorId schemeCache,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ NGRpcProxy::V1::IClustersCfgProvider** p)
: ActorSystem(actorSystem)
, SchemeCache(schemeCache)
- , Counters(counters) {
+ , Counters(counters)
+ , ClusterCfgProvider(p) {
}
void Execute();
@@ -31,6 +38,7 @@ private:
NActors::TActorSystem* ActorSystem;
NActors::TActorId SchemeCache;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+ NGRpcProxy::V1::IClustersCfgProvider** ClusterCfgProvider;
};
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index 40a7f2c65a..a1a697c651 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -2,9 +2,10 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/rpc_calls_topic.h>
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/grpc_services/service_table.h>
+#include <ydb/core/grpc_services/service_topic.h>
#include <ydb/core/tx/scheme_board/cache.h>
#include "actors/update_offsets_in_transaction_actor.h"
@@ -29,7 +30,7 @@ TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system,
void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NYdbGrpc::TLoggerPtr logger) {
CQ_ = cq;
- ServicesInitializer(ActorSystem_, SchemeCache, Counters_).Execute();
+ ServicesInitializer(ActorSystem_, SchemeCache, Counters_, &ClustersCfgProvider).Execute();
if (ActorSystem_->AppData<TAppData>()->PQConfig.GetEnabled()) {
SetupIncomingRequests(std::move(logger));
@@ -43,6 +44,7 @@ void TGRpcTopicService::DoUpdateOffsetsInTransaction(std::unique_ptr<IRequestOpC
void TGRpcTopicService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
+ using namespace std::placeholders;
auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters_, ActorSystem_);
{
@@ -118,27 +120,33 @@ void TGRpcTopicService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
"TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run();
ADD_REQUEST(CommitOffset, TopicService, CommitOffsetRequest, CommitOffsetResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCommitOffsetRequest(ctx));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCommitOffsetRequest(ctx));
+ })
ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDropTopicRequest(ctx, IsRlAllowed()));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvDropTopicRequest(ctx, &DoDropTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(CreateTopic, TopicService, CreateTopicRequest, CreateTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvCreateTopicRequest(ctx, IsRlAllowed()));
- })
+ auto clusterCfg = ClustersCfgProvider->GetCfg();
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateTopicRequest(ctx, std::bind(DoCreateTopicRequest, _1, _2, clusterCfg), TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(AlterTopic, TopicService, AlterTopicRequest, AlterTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvAlterTopicRequest(ctx, IsRlAllowed()));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvAlterTopicRequest(ctx, &DoAlterTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeTopicRequest(ctx, IsRlAllowed()));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTopicRequest(ctx, &DoDescribeTopicRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(DescribeConsumer, TopicService, DescribeConsumerRequest, DescribeConsumerResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed()));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeConsumerRequest(ctx, &DoDescribeConsumerRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
+
ADD_REQUEST(DescribePartition, TopicService, DescribePartitionRequest, DescribePartitionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribePartitionRequest(ctx, IsRlAllowed()));
- })
+ ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribePartitionRequest(ctx, &DoDescribePartitionRequest, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Off}));
+ })
#undef ADD_REQUEST
#ifdef ADD_REQUEST_LIMIT
diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h
index d33f55ccf1..f2aef10454 100644
--- a/ydb/services/persqueue_v1/topic.h
+++ b/ydb/services/persqueue_v1/topic.h
@@ -8,7 +8,7 @@
#include <ydb/core/grpc_services/base/base_service.h>
#include <ydb/core/grpc_services/base/base.h>
-
+#include <ydb/core/grpc_services/service_topic.h>
namespace NKikimr::NGRpcService::V1 {
@@ -32,6 +32,7 @@ private:
NActors::TActorId SchemeCache;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+ NKikimr::NGRpcProxy::V1::IClustersCfgProvider* ClustersCfgProvider = nullptr;
};
} // namespace NKikimr::NGRpcService::V1