diff options
author | ildar-khisam <ildar-khisam@yandex-team.ru> | 2022-06-20 16:37:01 +0300 |
---|---|---|
committer | ildar-khisam <ildar-khisam@yandex-team.ru> | 2022-06-20 16:37:01 +0300 |
commit | 3f7bfb1f7dc93608c92f476f0f2e7a1de7ec0c8b (patch) | |
tree | 4f93e68876306eebf0939f0b285b1e9e14951438 | |
parent | 322aff00242467b34829b98a0f46e750f1e6fbb7 (diff) | |
download | ydb-3f7bfb1f7dc93608c92f476f0f2e7a1de7ec0c8b.tar.gz |
implement topic service
set up topic service
ref:4d99d21c8de6edda5a82e7d72f8341c33dc7f372
-rw-r--r-- | ydb/core/driver_lib/run/run.cpp | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.cpp | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 10 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 2 | ||||
-rw-r--r-- | ydb/public/api/grpc/draft/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/public/api/grpc/draft/ydb_topic_v1.proto | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/topic.cpp | 149 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/topic.h | 47 |
10 files changed, 226 insertions, 1 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 7abdb5a8b3..a2f904b9c9 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -98,6 +98,7 @@ #include <ydb/services/ydb/ydb_logstore.h> #include <ydb/services/persqueue_cluster_discovery/grpc_service.h> #include <ydb/services/persqueue_v1/persqueue.h> +#include <ydb/services/persqueue_v1/topic.h> #include <ydb/services/rate_limiter/grpc_service.h> #include <ydb/services/discovery/grpc_service.h> #include <ydb/services/local_discovery/grpc_service.h> @@ -686,6 +687,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasPQv1) { server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); + server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); } if (hasPQCD) { diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 4aa27a5ba4..989b57917b 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -102,6 +102,8 @@ struct TRpcServices { EvStreamPQWrite, EvStreamPQRead, EvStreamPQMigrationRead, + EvStreamTopicWrite, + EvStreamTopicRead, EvPQReadInfo, EvListOperations, EvExportToYt, diff --git a/ydb/core/grpc_services/rpc_calls.cpp b/ydb/core/grpc_services/rpc_calls.cpp index 854123eed6..0dae748213 100644 --- a/ydb/core/grpc_services/rpc_calls.cpp +++ b/ydb/core/grpc_services/rpc_calls.cpp @@ -25,6 +25,18 @@ void FillYdbStatus(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage& resp } template <> +void FillYdbStatus(Ydb::Topic::StreamWriteMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { + resp.set_status(status); + NYql::IssuesToMessage(issues, resp.mutable_issues()); +} + +template <> +void FillYdbStatus(Ydb::Topic::StreamReadMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { + resp.set_status(status); + NYql::IssuesToMessage(issues, resp.mutable_issues()); +} + +template <> void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) { Y_UNUSED(resp); Y_UNUSED(issues); diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 91243d5d8e..6af89f307d 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -16,6 +16,7 @@ #include <ydb/public/api/protos/ydb_s3_internal.pb.h> #include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h> #include <ydb/public/api/protos/ydb_persqueue_v1.pb.h> +#include <ydb/public/api/protos/ydb_topic.pb.h> #include <ydb/public/api/protos/yq.pb.h> @@ -38,6 +39,12 @@ template <> void FillYdbStatus(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status); template <> +void FillYdbStatus(Ydb::Topic::StreamWriteMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status); + +template <> +void FillYdbStatus(Ydb::Topic::StreamReadMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status); + +template <> void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status); template <> @@ -48,10 +55,11 @@ using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoint using TEvS3ListingRequest = TGRpcRequestWrapper<TRpcServices::EvS3Listing, Ydb::S3Internal::S3ListingRequest, Ydb::S3Internal::S3ListingResponse, true>; using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>; using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>; - using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>; using TEvStreamPQReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQRead, Ydb::PersQueue::V1::StreamingReadClientMessage, Ydb::PersQueue::V1::StreamingReadServerMessage>; using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; +using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer>; +using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>; using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>; using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>; using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>; diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index a9fb212f6a..309c4977e2 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -28,6 +28,7 @@ #include <ydb/services/rate_limiter/grpc_service.h> #include <ydb/services/persqueue_cluster_discovery/grpc_service.h> #include <ydb/services/persqueue_v1/persqueue.h> +#include <ydb/services/persqueue_v1/topic.h> #include <ydb/services/persqueue_v1/grpc_pq_write.h> #include <ydb/services/yq/grpc_service.h> #include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h> @@ -311,6 +312,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId)); diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt index 8cb8f24ea2..2093014f61 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.txt @@ -23,6 +23,7 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/dummy.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto + ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_experimental_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto index 6203b20834..d119b3b459 100644 --- a/ydb/public/api/grpc/draft/ydb_topic_v1.proto +++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto @@ -80,3 +80,4 @@ service TopicService { // Drop topic command. rpc DropTopic(DropTopicRequest) returns (DropTopicResponse); +} diff --git a/ydb/services/persqueue_v1/CMakeLists.txt b/ydb/services/persqueue_v1/CMakeLists.txt index aebb6700e1..33f80d3e64 100644 --- a/ydb/services/persqueue_v1/CMakeLists.txt +++ b/ydb/services/persqueue_v1/CMakeLists.txt @@ -37,4 +37,5 @@ target_sources(ydb-services-persqueue_v1 PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp ) diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp new file mode 100644 index 0000000000..75e9a99e10 --- /dev/null +++ b/ydb/services/persqueue_v1/topic.cpp @@ -0,0 +1,149 @@ +#include "topic.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/counters.h> +#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/tx/scheme_board/cache.h> + +#include "grpc_pq_read.h" +#include "grpc_pq_write.h" +#include "grpc_pq_schema.h" + +namespace NKikimr { +namespace NGRpcService { +namespace V1 { + +static const ui32 TopicWriteSessionsMaxCount = 1000000; +static const ui32 TopicReadSessionsMaxCount = 100000; + +TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy) + : ActorSystem(system) + , Counters(counters) + , SchemeCache(schemeCache) + , GRpcRequestProxy(grpcRequestProxy) +{ } + +void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { + CQ = cq; + InitNewSchemeCacheActor(); + + if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) { + + IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, TopicWriteSessionsMaxCount); + TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); + ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); + + IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, TopicReadSessionsMaxCount); + actorId = ActorSystem->Register(readSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); + ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId); + + IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters); + actorId = ActorSystem->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); + ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId); + + SetupIncomingRequests(std::move(logger)); + } +} + +void TGRpcTopicService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { + Limiter = limiter; +} + +bool TGRpcTopicService::IncRequest() { + return Limiter->Inc(); +} + +void TGRpcTopicService::DecRequest() { + Limiter->Dec(); +} + +void TGRpcTopicService::InitNewSchemeCacheActor() { + auto appData = ActorSystem->AppData<TAppData>(); + auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache"); + auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); + NewSchemeCache = ActorSystem->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), + TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); +} + +void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + + auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem); + + { + using TBiRequest = Ydb::Topic::StreamWriteMessage::FromClient; + + using TBiResponse = Ydb::Topic::StreamWriteMessage::FromServer; + + using TStreamGRpcRequest = NGRpcServer::TGRpcStreamingRequest< + TBiRequest, + TBiResponse, + TGRpcTopicService, + NKikimrServices::GRPC_SERVER>; + + + TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamWrite, + [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { + ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicWriteRequest(context)); + }, + *ActorSystem, "TopicService/CreateWriteSession", getCounterBlock("topic", "WriteSession", true, true), nullptr + ); + } + + { + using TBiRequest = Ydb::Topic::StreamReadMessage::FromClient; + + using TBiResponse = Ydb::Topic::StreamReadMessage::FromServer; + + using TStreamGRpcRequest = NGRpcServer::TGRpcStreamingRequest< + TBiRequest, + TBiResponse, + TGRpcTopicService, + NKikimrServices::GRPC_SERVER>; + + + TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamRead, + [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) { + ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicReadRequest(context)); + }, + *ActorSystem, "TopicService/CreateReadSession", getCounterBlock("topic", "ReadSession", true, true), nullptr + ); + } + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif +#define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \ + MakeIntrusive<TGRpcRequest<Ydb::Topic::IN, Ydb::Topic::OUT, NGRpcService::V1::TGRpcTopicService>>(this, this->GetService(), CQ, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \ + ACTION; \ + }, &Ydb::Topic::V1::SVC::AsyncService::Request ## NAME, \ + "TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run(); + + ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, { + ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDropTopicRequest(ctx)); + }) + + ADD_REQUEST(CreateTopic, TopicService, CreateTopicRequest, CreateTopicResponse, { + ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQCreateTopicRequest(ctx)); + }) + ADD_REQUEST(AlterTopic, TopicService, AlterTopicRequest, AlterTopicResponse, { + ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQAlterTopicRequest(ctx)); + }) + ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, { + ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDescribeTopicRequest(ctx)); + }) + +#undef ADD_REQUEST + + +} + +void TGRpcTopicService::StopService() noexcept { + TGrpcServiceBase::StopService(); +} + +} // V1 +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h new file mode 100644 index 0000000000..7f516844b2 --- /dev/null +++ b/ydb/services/persqueue_v1/topic.h @@ -0,0 +1,47 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> + +#include <ydb/public/api/grpc/draft/ydb_topic_v1.grpc.pb.h> + +#include <library/cpp/grpc/server/grpc_server.h> + + +namespace NKikimr { + +namespace NGRpcService { +namespace V1 { + +class TGRpcTopicService + : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService> +{ +public: + TGRpcTopicService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + void StopService() noexcept override; + + using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>::GetService; + + bool IncRequest(); + void DecRequest(); + +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + + void InitNewSchemeCacheActor(); + + NActors::TActorSystem* ActorSystem; + grpc::ServerCompletionQueue* CQ = nullptr; + + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + NGrpc::TGlobalLimiter* Limiter = nullptr; + NActors::TActorId SchemeCache; + NActors::TActorId NewSchemeCache; + NActors::TActorId GRpcRequestProxy; +}; + +} // namespace V1 +} // namespace NGRpcService +} // namespace NKikimr |