aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ildar-khisam@yandex-team.ru>2022-06-20 16:37:01 +0300
committerildar-khisam <ildar-khisam@yandex-team.ru>2022-06-20 16:37:01 +0300
commit3f7bfb1f7dc93608c92f476f0f2e7a1de7ec0c8b (patch)
tree4f93e68876306eebf0939f0b285b1e9e14951438
parent322aff00242467b34829b98a0f46e750f1e6fbb7 (diff)
downloadydb-3f7bfb1f7dc93608c92f476f0f2e7a1de7ec0c8b.tar.gz
implement topic service
set up topic service ref:4d99d21c8de6edda5a82e7d72f8341c33dc7f372
-rw-r--r--ydb/core/driver_lib/run/run.cpp2
-rw-r--r--ydb/core/grpc_services/base/base.h2
-rw-r--r--ydb/core/grpc_services/rpc_calls.cpp12
-rw-r--r--ydb/core/grpc_services/rpc_calls.h10
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/public/api/grpc/draft/CMakeLists.txt1
-rw-r--r--ydb/public/api/grpc/draft/ydb_topic_v1.proto1
-rw-r--r--ydb/services/persqueue_v1/CMakeLists.txt1
-rw-r--r--ydb/services/persqueue_v1/topic.cpp149
-rw-r--r--ydb/services/persqueue_v1/topic.h47
10 files changed, 226 insertions, 1 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 7abdb5a8b3..a2f904b9c9 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -98,6 +98,7 @@
#include <ydb/services/ydb/ydb_logstore.h>
#include <ydb/services/persqueue_cluster_discovery/grpc_service.h>
#include <ydb/services/persqueue_v1/persqueue.h>
+#include <ydb/services/persqueue_v1/topic.h>
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/discovery/grpc_service.h>
#include <ydb/services/local_discovery/grpc_service.h>
@@ -686,6 +687,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (hasPQv1) {
server.AddService(new NGRpcService::V1::TGRpcPersQueueService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
+ server.AddService(new NGRpcService::V1::TGRpcTopicService(ActorSystem.Get(), Counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
}
if (hasPQCD) {
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 4aa27a5ba4..989b57917b 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -102,6 +102,8 @@ struct TRpcServices {
EvStreamPQWrite,
EvStreamPQRead,
EvStreamPQMigrationRead,
+ EvStreamTopicWrite,
+ EvStreamTopicRead,
EvPQReadInfo,
EvListOperations,
EvExportToYt,
diff --git a/ydb/core/grpc_services/rpc_calls.cpp b/ydb/core/grpc_services/rpc_calls.cpp
index 854123eed6..0dae748213 100644
--- a/ydb/core/grpc_services/rpc_calls.cpp
+++ b/ydb/core/grpc_services/rpc_calls.cpp
@@ -25,6 +25,18 @@ void FillYdbStatus(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage& resp
}
template <>
+void FillYdbStatus(Ydb::Topic::StreamWriteMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
+ resp.set_status(status);
+ NYql::IssuesToMessage(issues, resp.mutable_issues());
+}
+
+template <>
+void FillYdbStatus(Ydb::Topic::StreamReadMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
+ resp.set_status(status);
+ NYql::IssuesToMessage(issues, resp.mutable_issues());
+}
+
+template <>
void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status) {
Y_UNUSED(resp);
Y_UNUSED(issues);
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 91243d5d8e..6af89f307d 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -16,6 +16,7 @@
#include <ydb/public/api/protos/ydb_s3_internal.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_cluster_discovery.pb.h>
#include <ydb/public/api/protos/ydb_persqueue_v1.pb.h>
+#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/public/api/protos/yq.pb.h>
@@ -38,6 +39,12 @@ template <>
void FillYdbStatus(Ydb::PersQueue::V1::MigrationStreamingReadServerMessage& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status);
template <>
+void FillYdbStatus(Ydb::Topic::StreamWriteMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status);
+
+template <>
+void FillYdbStatus(Ydb::Topic::StreamReadMessage::FromServer& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status);
+
+template <>
void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status);
template <>
@@ -48,10 +55,11 @@ using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoint
using TEvS3ListingRequest = TGRpcRequestWrapper<TRpcServices::EvS3Listing, Ydb::S3Internal::S3ListingRequest, Ydb::S3Internal::S3ListingResponse, true>;
using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStreamPing, Draft::Dummy::PingRequest, Draft::Dummy::PingResponse>;
using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>;
-
using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>;
using TEvStreamPQReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQRead, Ydb::PersQueue::V1::StreamingReadClientMessage, Ydb::PersQueue::V1::StreamingReadServerMessage>;
using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
+using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer>;
+using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>;
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index a9fb212f6a..309c4977e2 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -28,6 +28,7 @@
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/persqueue_cluster_discovery/grpc_service.h>
#include <ydb/services/persqueue_v1/persqueue.h>
+#include <ydb/services/persqueue_v1/topic.h>
#include <ydb/services/persqueue_v1/grpc_pq_write.h>
#include <ydb/services/yq/grpc_service.h>
#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h>
@@ -311,6 +312,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcYdbScriptingService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId));
diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt
index 8cb8f24ea2..2093014f61 100644
--- a/ydb/public/api/grpc/draft/CMakeLists.txt
+++ b/ydb/public/api/grpc/draft/CMakeLists.txt
@@ -23,6 +23,7 @@ target_proto_messages(api-grpc-draft PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/dummy.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_clickhouse_internal_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_persqueue_v1.proto
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_datastreams_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_experimental_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto
diff --git a/ydb/public/api/grpc/draft/ydb_topic_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
index 6203b20834..d119b3b459 100644
--- a/ydb/public/api/grpc/draft/ydb_topic_v1.proto
+++ b/ydb/public/api/grpc/draft/ydb_topic_v1.proto
@@ -80,3 +80,4 @@ service TopicService {
// Drop topic command.
rpc DropTopic(DropTopicRequest) returns (DropTopicResponse);
+}
diff --git a/ydb/services/persqueue_v1/CMakeLists.txt b/ydb/services/persqueue_v1/CMakeLists.txt
index aebb6700e1..33f80d3e64 100644
--- a/ydb/services/persqueue_v1/CMakeLists.txt
+++ b/ydb/services/persqueue_v1/CMakeLists.txt
@@ -37,4 +37,5 @@ target_sources(ydb-services-persqueue_v1 PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/grpc_pq_write.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/topic.cpp
)
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
new file mode 100644
index 0000000000..75e9a99e10
--- /dev/null
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -0,0 +1,149 @@
+#include "topic.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/counters.h>
+#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/tx/scheme_board/cache.h>
+
+#include "grpc_pq_read.h"
+#include "grpc_pq_write.h"
+#include "grpc_pq_schema.h"
+
+namespace NKikimr {
+namespace NGRpcService {
+namespace V1 {
+
+static const ui32 TopicWriteSessionsMaxCount = 1000000;
+static const ui32 TopicReadSessionsMaxCount = 100000;
+
+TGRpcTopicService::TGRpcTopicService(NActors::TActorSystem *system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache,const NActors::TActorId& grpcRequestProxy)
+ : ActorSystem(system)
+ , Counters(counters)
+ , SchemeCache(schemeCache)
+ , GRpcRequestProxy(grpcRequestProxy)
+{ }
+
+void TGRpcTopicService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
+ CQ = cq;
+ InitNewSchemeCacheActor();
+
+ if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
+
+ IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, TopicWriteSessionsMaxCount);
+ TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+ ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);
+
+ IActor* readSvc = NGRpcProxy::V1::CreatePQReadService(SchemeCache, NewSchemeCache, Counters, TopicReadSessionsMaxCount);
+ actorId = ActorSystem->Register(readSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+ ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQReadServiceActorID(), actorId);
+
+ IActor* schemaSvc = NGRpcProxy::V1::CreatePQSchemaService(SchemeCache, Counters);
+ actorId = ActorSystem->Register(schemaSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+ ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQSchemaServiceActorID(), actorId);
+
+ SetupIncomingRequests(std::move(logger));
+ }
+}
+
+void TGRpcTopicService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
+ Limiter = limiter;
+}
+
+bool TGRpcTopicService::IncRequest() {
+ return Limiter->Inc();
+}
+
+void TGRpcTopicService::DecRequest() {
+ Limiter->Dec();
+}
+
+void TGRpcTopicService::InitNewSchemeCacheActor() {
+ auto appData = ActorSystem->AppData<TAppData>();
+ auto cacheCounters = GetServiceCounters(Counters, "pqproxy|schemecache");
+ auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
+ NewSchemeCache = ActorSystem->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
+ TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
+}
+
+void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+
+ auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem);
+
+ {
+ using TBiRequest = Ydb::Topic::StreamWriteMessage::FromClient;
+
+ using TBiResponse = Ydb::Topic::StreamWriteMessage::FromServer;
+
+ using TStreamGRpcRequest = NGRpcServer::TGRpcStreamingRequest<
+ TBiRequest,
+ TBiResponse,
+ TGRpcTopicService,
+ NKikimrServices::GRPC_SERVER>;
+
+
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamWrite,
+ [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
+ ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicWriteRequest(context));
+ },
+ *ActorSystem, "TopicService/CreateWriteSession", getCounterBlock("topic", "WriteSession", true, true), nullptr
+ );
+ }
+
+ {
+ using TBiRequest = Ydb::Topic::StreamReadMessage::FromClient;
+
+ using TBiResponse = Ydb::Topic::StreamReadMessage::FromServer;
+
+ using TStreamGRpcRequest = NGRpcServer::TGRpcStreamingRequest<
+ TBiRequest,
+ TBiResponse,
+ TGRpcTopicService,
+ NKikimrServices::GRPC_SERVER>;
+
+
+ TStreamGRpcRequest::Start(this, this->GetService(), CQ, &Ydb::Topic::V1::TopicService::AsyncService::RequestStreamRead,
+ [this](TIntrusivePtr<TStreamGRpcRequest::IContext> context) {
+ ActorSystem->Send(GRpcRequestProxy, new NKikimr::NGRpcService::TEvStreamTopicReadRequest(context));
+ },
+ *ActorSystem, "TopicService/CreateReadSession", getCounterBlock("topic", "ReadSession", true, true), nullptr
+ );
+ }
+
+#ifdef ADD_REQUEST
+#error ADD_REQUEST macro already defined
+#endif
+#define ADD_REQUEST(NAME, SVC, IN, OUT, ACTION) \
+ MakeIntrusive<TGRpcRequest<Ydb::Topic::IN, Ydb::Topic::OUT, NGRpcService::V1::TGRpcTopicService>>(this, this->GetService(), CQ, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \
+ ACTION; \
+ }, &Ydb::Topic::V1::SVC::AsyncService::Request ## NAME, \
+ "TopicService/"#NAME, logger, getCounterBlock("topic", #NAME))->Run();
+
+ ADD_REQUEST(DropTopic, TopicService, DropTopicRequest, DropTopicResponse, {
+ ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDropTopicRequest(ctx));
+ })
+
+ ADD_REQUEST(CreateTopic, TopicService, CreateTopicRequest, CreateTopicResponse, {
+ ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQCreateTopicRequest(ctx));
+ })
+ ADD_REQUEST(AlterTopic, TopicService, AlterTopicRequest, AlterTopicResponse, {
+ ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQAlterTopicRequest(ctx));
+ })
+ ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, {
+ ActorSystem->Send(GRpcRequestProxy, new NGRpcService::TEvPQDescribeTopicRequest(ctx));
+ })
+
+#undef ADD_REQUEST
+
+
+}
+
+void TGRpcTopicService::StopService() noexcept {
+ TGrpcServiceBase::StopService();
+}
+
+} // V1
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h
new file mode 100644
index 0000000000..7f516844b2
--- /dev/null
+++ b/ydb/services/persqueue_v1/topic.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+#include <ydb/public/api/grpc/draft/ydb_topic_v1.grpc.pb.h>
+
+#include <library/cpp/grpc/server/grpc_server.h>
+
+
+namespace NKikimr {
+
+namespace NGRpcService {
+namespace V1 {
+
+class TGRpcTopicService
+ : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>
+{
+public:
+ TGRpcTopicService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const NActors::TActorId& schemeCache, const NActors::TActorId& grpcRequestProxy);
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+ void StopService() noexcept override;
+
+ using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicService>::GetService;
+
+ bool IncRequest();
+ void DecRequest();
+
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+
+ void InitNewSchemeCacheActor();
+
+ NActors::TActorSystem* ActorSystem;
+ grpc::ServerCompletionQueue* CQ = nullptr;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ NGrpc::TGlobalLimiter* Limiter = nullptr;
+ NActors::TActorId SchemeCache;
+ NActors::TActorId NewSchemeCache;
+ NActors::TActorId GRpcRequestProxy;
+};
+
+} // namespace V1
+} // namespace NGRpcService
+} // namespace NKikimr