diff options
author | alexnick <alexnick@ydb.tech> | 2022-11-28 16:51:29 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-11-28 16:51:29 +0300 |
commit | be7707450e36daf4e717fc52ecb58a870f45a705 (patch) | |
tree | 2a693fe67656884d1ef021ad5d482ba638683a5e | |
parent | 00e9a853d87f80d1c24085555b7d06cc56112d99 (diff) | |
download | ydb-be7707450e36daf4e717fc52ecb58a870f45a705.tar.gz |
topic describe stats
progress
progress
progress
progress
progress
progress
progress
progress
proto
23 files changed, 966 insertions, 78 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index ab69d7beb16..722ed130213 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -133,6 +133,7 @@ struct TRpcServices { EvCreateTopic, EvAlterTopic, EvDescribeTopic, + EvDescribeConsumer, EvGetDiskSpaceUsage, EvStopServingDatabase, EvCoordinationSession, diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 2cacc71877d..2976a3dd594 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -541,8 +541,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvCreateTopicRequest, PreHandle); HFunc(TEvAlterTopicRequest, PreHandle); HFunc(TEvDescribeTopicRequest, PreHandle); + HFunc(TEvDescribeConsumerRequest, PreHandle); HFunc(TEvNodeCheckRequest, PreHandle); - HFunc(TEvProxyRuntimeEvent, PreHandle); default: diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index c61c15fb7aa..4ff52e8e3a8 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -71,6 +71,7 @@ protected: void Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx); TActorId DiscoveryCacheActorID; }; diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 7614381de74..b8f318c4561 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -68,6 +68,7 @@ using TEvDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDropTo using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTopic, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse, true, TRateLimiterMode::Rps>; using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>; using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>; +using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>; using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index 5ea9600c343..15a7964fd85 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -81,7 +81,11 @@ struct TEvPersQueue { struct TEvGetReadSessionsInfo: public TEventPB<TEvGetReadSessionsInfo, NKikimrPQ::TGetReadSessionsInfo, EvGetReadSessionsInfo> { - TEvGetReadSessionsInfo() {} + TEvGetReadSessionsInfo(const TString& consumer = "") { + if (!consumer.empty()) { + Record.SetClientId(consumer); + } + } }; struct TEvReadSessionsInfoResponse: public TEventPB<TEvReadSessionsInfoResponse, @@ -125,9 +129,11 @@ struct TEvPersQueue { struct TEvStatus : public TEventPB<TEvStatus, NKikimrPQ::TStatus, EvStatus> { - explicit TEvStatus(const TString& consumer = "") { + explicit TEvStatus(const TString& consumer = "", bool getStatForAllConsumers = false) { if (!consumer.empty()) Record.SetClientId(consumer); + if (getStatForAllConsumers) + Record.SetGetStatForAllConsumers(true); } }; diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index afc8bf66514..6d60b347d52 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -310,13 +310,15 @@ struct TEvPQ { }; struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> { - explicit TEvPartitionStatus(const TActorId& sender, const TString& clientId) + explicit TEvPartitionStatus(const TActorId& sender, const TString& clientId, bool getStatForAllConsumers) : Sender(sender) , ClientId(clientId) + , GetStatForAllConsumers(getStatForAllConsumers) {} TActorId Sender; TString ClientId; + bool GetStatForAllConsumers; }; struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index eca824c0dd4..1e203bce1f1 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2094,6 +2094,22 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext ui64 totalLag = clientInfo->GetReadLagMs() + userInfo.GetWriteLagMs() + (ctx.Now() - userInfo.GetReadTimestamp()).MilliSeconds(); clientInfo->SetTotalLagMs(totalLag); } + + if (ev->Get()->GetStatForAllConsumers) { //fill lags + auto* clientInfo = result.AddConsumerResult(); + clientInfo->SetConsumer(userInfo.User); + auto readTimestamp = (userInfo.GetReadWriteTimestamp() ? userInfo.GetReadWriteTimestamp() : GetWriteTimeEstimate(userInfo.GetReadOffset())).MilliSeconds(); + clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset + ? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(readTimestamp)).MilliSeconds() + : 0); + clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds()); + clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs()); + + clientInfo->SetAvgReadSpeedPerMin(userInfo.AvgReadBytes[1].GetValue()); + clientInfo->SetAvgReadSpeedPerHour(userInfo.AvgReadBytes[2].GetValue()); + clientInfo->SetAvgReadSpeedPerDay(userInfo.AvgReadBytes[3].GetValue()); + } + } result.SetAvgReadSpeedPerSec(resSpeed[0]); result.SetAvgReadSpeedPerMin(resSpeed[1]); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 8ea588c752d..d78a6cc03b7 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1422,7 +1422,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& for (auto& p : Partitions) { if (!p.second.InitDone) continue; - THolder<TEvPQ::TEvPartitionStatus> event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : ""); + THolder<TEvPQ::TEvPartitionStatus> event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "", + ev->Get()->Record.HasGetStatForAllConsumers() ? ev->Get()->Record.GetGetStatForAllConsumers() : false); ctx.Send(p.second.Actor, event.Release()); } } diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 7b183f066b1..89676eef2ea 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -904,11 +904,13 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& pi->SetProxyNodeId(jt->second.ProxyNodeId); pi->SetSession(jt->second.Session); pi->SetTimestamp(jt->second.Timestamp.Seconds()); + pi->SetTimestampMs(jt->second.Timestamp.MilliSeconds()); } else { pi->SetClientNode(""); pi->SetProxyNodeId(0); pi->SetSession(""); pi->SetTimestamp(0); + pi->SetTimestampMs(0); } } for (auto& s : c.second.SessionsInfo) { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 0e3b900812f..2345a6b759b 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -470,7 +470,6 @@ message TReadSessionStatusResponse { optional string ClientNode = 6; optional uint32 ProxyNodeId = 7; - } @@ -481,6 +480,7 @@ message TReadSessionsInfoResponse { optional uint32 ProxyNodeId = 3; optional string Session = 4; optional uint64 Timestamp = 5; + optional uint64 TimestampMs = 6; } repeated TPartitionInfo PartitionInfo = 1; optional uint64 TabletId = 2; @@ -587,6 +587,7 @@ message TOffsetsResponse { message TStatus { optional string ClientId = 1; + optional bool GetStatForAllConsumers = 2; } message TClientPosition { @@ -662,6 +663,20 @@ message TStatusResponse { optional int64 SourceIdRetentionPeriodSec = 28; repeated TErrorMessage Errors = 29; + + repeated TConsumerResult ConsumerResult = 30; + } + + message TConsumerResult { + optional string Consumer = 1; + + optional int64 AvgReadSpeedPerMin = 2; + optional int64 AvgReadSpeedPerHour = 3; + optional int64 AvgReadSpeedPerDay = 4; + + optional uint64 WriteLagMs = 5; + optional uint64 ReadLagMs = 6; + optional uint64 LastReadTimestampMs = 7; } optional uint64 TabletId = 1; diff --git a/ydb/public/api/grpc/ydb_topic_v1.proto b/ydb/public/api/grpc/ydb_topic_v1.proto index d119b3b4594..a65012e4148 100644 --- a/ydb/public/api/grpc/ydb_topic_v1.proto +++ b/ydb/public/api/grpc/ydb_topic_v1.proto @@ -69,15 +69,15 @@ service TopicService { // Create topic command. rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); - // Describe topic command. rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse); + // Describe topic's consumer command. + rpc DescribeConsumer(DescribeConsumerRequest) returns (DescribeConsumerResponse); // Alter topic command. rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse); - // Drop topic command. rpc DropTopic(DropTopicRequest) returns (DropTopicResponse); } diff --git a/ydb/public/api/protos/ydb_scheme.proto b/ydb/public/api/protos/ydb_scheme.proto index 04f717b6dfd..54b05f87e7a 100644 --- a/ydb/public/api/protos/ydb_scheme.proto +++ b/ydb/public/api/protos/ydb_scheme.proto @@ -8,6 +8,8 @@ option java_outer_classname = "SchemeOperationProtos"; import "ydb/public/api/protos/ydb_common.proto"; import "ydb/public/api/protos/ydb_operation.proto"; +import "google/protobuf/timestamp.proto"; + // Create directory. // All intermediate directories must be created message MakeDirectoryRequest { diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index f310bf16eef..a5a6c5abde6 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -290,6 +290,8 @@ message StreamReadMessage { repeated TopicReadSettings topics_read_settings = 1; // Path of consumer that is used for reading by this session. string consumer = 2; + // Optional name. Will be shown in debug stat. + string reader_name = 3; message TopicReadSettings { // Topic path. @@ -541,6 +543,12 @@ message AddOffsetsToTransactionResult { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Control messages +// message representing statistics by seleveral windows +message MultipleWindowsStat { + int64 per_minute = 1; + int64 per_hour = 2; + int64 per_day = 3; +} // Consumer description. message Consumer { @@ -559,6 +567,20 @@ message Consumer { // Attributes of consumer map<string, string> attributes = 6; + + // Filled only when requested statistics in Describe*Request. + ConsumerStats consumer_stats = 7; + + message ConsumerStats { + // Minimal timestamp of last read from partitions. + google.protobuf.Timestamp min_partitions_last_read_time = 1; + // Maximum of differences between timestamp of read and write timestamp for all messages, read during last minute. + google.protobuf.Duration max_read_time_lag = 2; + // Maximum of differences between write timestamp and create timestamp for all messages, read during last minute. + google.protobuf.Duration max_write_time_lag = 3; + // Bytes read stastics. + MultipleWindowsStat bytes_read = 4; + } } // Consumer alter description. @@ -670,6 +692,9 @@ message DescribeTopicRequest { // Topic path. string path = 2; + + // Include topic statistics. + bool include_stats = 3; } // Describe topic response sent from server to client. @@ -720,6 +745,9 @@ message DescribeTopicResult { // Metering settings. MeteringMode metering_mode = 12; + // Statistics of topic. + TopicStats topic_stats = 13; + message PartitionInfo { // Partition identifier. int64 partition_id = 1; @@ -729,7 +757,111 @@ message DescribeTopicResult { repeated int64 child_partition_ids = 3; // Ids of partitions from which this partition was formed by split or merge. repeated int64 parent_partition_ids = 4; + + // Stats for partition, filled only when include_stats in request is true. + PartitionStats partition_stats = 5; } + + message TopicStats { + // Approximate size of topic. + int64 store_size_bytes = 1; + + // Minimum of timestamps of last write among all partitions. + google.protobuf.Timestamp min_last_write_time = 2; + // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute. + google.protobuf.Duration max_write_time_lag = 3; + // How much bytes were written statistics. + MultipleWindowsStat bytes_written = 4; + } +} + + +// Describe topic's consumer request sent from client to server. +message DescribeConsumerRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Topic path. + string path = 2; + // Consumer name; + string consumer = 3; + // Include consumer statistics. + bool include_stats = 4; +} + +// Describe topic's consumer response sent from server to client. +// If topic is not existed then response status will be "SCHEME_ERROR". +message DescribeConsumerResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Describe topic's consumer result message that will be inside DescribeConsumerResponse.operation. +message DescribeConsumerResult { + // Description of scheme object. + Ydb.Scheme.Entry self = 1; + + Consumer consumer = 2; + + repeated PartitionInfo partitions = 3; + + message PartitionInfo { + // Partition identifier. + int64 partition_id = 1; + // Is partition open for write. + bool active = 2; + // Ids of partitions which was formed when this partition was split or merged. + repeated int64 child_partition_ids = 3; + // Ids of partitions from which this partition was formed by split or merge. + repeated int64 parent_partition_ids = 4; + + // Stats for partition, filled only when include_stats in request is true. + PartitionStats partition_stats = 5; + // Stats for consumer of this partition, filled only when include_stats in request is true. + PartitionConsumerStats partition_consumer_stats = 6; + } + + message PartitionConsumerStats { + // Last read offset from this partition. + int64 last_read_offset = 1; + // Committed offset for this partition. + int64 committed_offset = 2; + // Reading this partition read session identifier. + string read_session_id = 3; + + // Timestamp of providing this partition to this session by server. + google.protobuf.Timestamp partition_read_session_create_time = 4; + + // Timestamp of last read from this partition. + google.protobuf.Timestamp last_read_time = 5; + // Maximum of differences between timestamp of read and write timestamp for all messages, read during last minute. + google.protobuf.Duration max_read_time_lag = 6; + // Maximum of differences between write timestamp and create timestamp for all messages, read during last minute. + google.protobuf.Duration max_write_time_lag = 7; + + // How much bytes were read during several windows statistics from this partiton. + MultipleWindowsStat bytes_read = 8; + + // Read session name, provided by client. + string reader_name = 11; + // Host where read session connected. + int32 connection_node_id = 12; + } +} + +message PartitionStats { + // Partition contains messages with offsets in range [start, end). + OffsetsRange partition_offsets = 1; + // Approximate size of partition. + int64 store_size_bytes = 2; + // Timestamp of last write. + google.protobuf.Timestamp last_write_time = 3; + // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute. + google.protobuf.Duration max_write_time_lag = 4; + // How much bytes were written during several windows in this partition. + MultipleWindowsStat bytes_written = 5; + + // Host where tablet for this partition works. Useful for debugging purposes. + int32 partition_node_id = 8; } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index b3da17970f2..08fe0a71dbf 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -185,7 +185,7 @@ namespace NKikimr::NGRpcProxy::V1 { NSchemeCache::TSchemeCacheNavigate::KindTopic) { this->Request_->RaiseIssue( FillIssue( - TStringBuilder() << "path '" << path << "' is not a stream", + TStringBuilder() << "path '" << path << "' is not a topic", Ydb::PersQueue::ErrorCode::ERROR ) ); diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.txt b/ydb/services/persqueue_v1/actors/CMakeLists.txt index 255fa7270e5..e65325c51d9 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC ydb-core-base ydb-core-grpc_services ydb-core-persqueue + core-persqueue-events ydb-core-protos ydb-core-scheme core-tx-scheme_cache diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index f3d685fd438..536714d08cc 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -62,6 +62,7 @@ struct TEvPQProxy { EvUpdateToken, EvTopicUpdateToken, EvCommitRange, + EvRequestTablet, EvEnd }; @@ -435,6 +436,13 @@ struct TEvPQProxy { ui64 WriteTimestampEstimateMs; bool Init; }; + struct TEvRequestTablet : public NActors::TEventLocal<TEvRequestTablet, EvRequestTablet> { + TEvRequestTablet(const ui64 tabletId) + : TabletId(tabletId) + { } + + ui64 TabletId; + }; }; } diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 95b42062601..85a879c661f 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -617,6 +617,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& << "_" << "v1"; CommitsDisabled = false; + PeerName = ev->Get()->PeerName; + if constexpr (UseMigrationProtocol) { RangesMode = init.ranges_mode(); MaxReadMessagesCount = NormalizeMaxReadMessagesCount(init.read_params().max_read_messages_count()); @@ -631,6 +633,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& MaxTimeLagMs = 0; // max_lag per topic only ReadTimestampMs = 0; // read_from per topic only ReadOnlyLocal = true; + if (init.reader_name()) { + PeerName = init.reader_name(); + } } if (MaxTimeLagMs < 0) { @@ -641,7 +646,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "start_from_written_at_ms must be nonnegative number", ctx); } - PeerName = ev->Get()->PeerName; auto getTopicPath = [](const auto& settings) { if constexpr (UseMigrationProtocol) { diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index fe23b799d07..3fdfb0821f1 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -439,16 +439,408 @@ void TAlterTopicActor::ModifyPersqueueConfig( TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request) : TBase(request, request->GetProtoRequest()->path()) + , TDescribeTopicActorImpl("") { } -void TDescribeTopicActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { +TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request) + : TBase(request, request->GetProtoRequest()->path()) + , TDescribeTopicActorImpl(request->GetProtoRequest()->consumer()) +{ +} + +TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TString& consumer) + : Consumer(consumer) +{ +} + + +bool TDescribeTopicActorImpl::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { switch (ev->GetTypeRewrite()) { - default: TBase::StateWork(ev, ctx); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + HFunc(TEvTabletPipe::TEvClientConnected, Handle); + HFunc(NKikimr::TEvPersQueue::TEvStatusResponse, Handle); + HFunc(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse, Handle); + default: return false; + } + return true; +} + +void TDescribeTopicActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + if (!TDescribeTopicActorImpl::StateWork(ev, ctx)) { + TBase::StateWork(ev, ctx); + } +} + +void TDescribeConsumerActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) { + if (!TDescribeTopicActorImpl::StateWork(ev, ctx)) { + TBase::StateWork(ev, ctx); + } +} + + +void TDescribeTopicActorImpl::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + if (ev->Get()->Status != NKikimrProto::OK) { + RestartTablet(ev->Get()->TabletId, ctx, ev->Sender); + } else { + auto it = Tablets.find(ev->Get()->TabletId); + if (it == Tablets.end()) return; + it->second.NodeId = ev->Get()->ServerId.NodeId(); + } +} + +void TDescribeTopicActorImpl::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + RestartTablet(ev->Get()->TabletId, ctx, ev->Sender); +} + +void TDescribeTopicActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) { + this->Request_->RaiseIssue(FillIssue(error, errorCode)); + TBase::Reply(status, ctx); +} + +void TDescribeConsumerActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) { + this->Request_->RaiseIssue(FillIssue(error, errorCode)); + TBase::Reply(status, ctx); +} + + +void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe, const TDuration& delay) { + auto it = Tablets.find(tabletId); + if (it == Tablets.end()) return; + if (pipe && pipe != it->second.Pipe) return; + if (--it->second.RetriesLeft == 0) { + return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx); + } + Y_VERIFY(RequestsInfly > 0); + --RequestsInfly; + if (delay == TDuration::Zero()) { + RequestTablet(it->second, ctx); + } else { + ++RequestsInfly; + ctx.Schedule(delay, new TEvPQProxy::TEvRequestTablet(tabletId)); + } +} + +void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx) { + --RequestsInfly; + auto it = Tablets.find(ev->Get()->TabletId); + if (it == Tablets.end()) return; + RequestTablet(it->second, ctx); +} + +void TDescribeTopicActorImpl::RequestTablet(TTabletInfo& tablet, const TActorContext& ctx) { + tablet.Pipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tablet.TabletId, NTabletPipe::TClientConfig(NTabletPipe::TClientRetryPolicy::WithRetries()))); + + if (tablet.TabletId == BalancerTabletId) { + THolder<NKikimr::TEvPersQueue::TEvGetReadSessionsInfo> ev(new NKikimr::TEvPersQueue::TEvGetReadSessionsInfo(Consumer)); + NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release()); + + } else { + THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Consumer), Consumer.empty())); + NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release()); + } + ++RequestsInfly; +} + +void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { + auto it = Tablets.find(ev->Get()->Record.GetTabletId()); + if (it == Tablets.end()) return; + --RequestsInfly; + NTabletPipe::CloseClient(ctx, it->second.Pipe); + it->second.Pipe = TActorId{}; + + auto& record = ev->Get()->Record; + for (auto& partResult : record.GetPartResult()) { + if (partResult.GetStatus() == NKikimrPQ::TStatusResponse::STATUS_INITIALIZING || + partResult.GetStatus() == NKikimrPQ::TStatusResponse::STATUS_UNKNOWN) { + RestartTablet(record.GetTabletId(), ctx, {}, TDuration::MilliSeconds(100)); + return; + } + } + + ApplyResponse(it->second, ev, ctx); + + if (RequestsInfly == 0) { + RequestAdditionalInfo(ctx); + if (RequestsInfly == 0) { + Reply(ctx); + } + } +} + + +void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) { + if (BalancerTabletId == 0) + return; + auto it = Tablets.find(BalancerTabletId); + Y_VERIFY(it != Tablets.end()); + --RequestsInfly; + NTabletPipe::CloseClient(ctx, it->second.Pipe); + it->second.Pipe = TActorId{}; + BalancerTabletId = 0; + + ApplyResponse(it->second, ev, ctx); + + if (RequestsInfly == 0) { + RequestAdditionalInfo(ctx); + if (RequestsInfly == 0) { + Reply(ctx); + } + } +} + + +void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) { + if (BalancerTabletId) { + RequestTablet(BalancerTabletId, ctx); + } +} + +void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) { + auto it = Tablets.find(tabletId); + if (it != Tablets.end()) { + RequestTablet(it->second, ctx); + } +} + + +template<class T> +void SetProtoTime(T* proto, const ui64 ms) { + proto->set_seconds(ms / 1000); + proto->set_nanos((ms % 1000) * 1'000'000); +} + +template<class T> +void UpdateProtoTime(T* proto, const ui64 ms, bool storeMin) { + ui64 storedMs = proto->seconds() * 1000 + proto->nanos() / 1'000'000; + if ((ms < storedMs) == storeMin) { + SetProtoTime(proto, ms); + } +} + + +void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ctx); + Y_UNUSED(tabletInfo); + Y_UNUSED(ev); + Y_FAIL(""); +} + + +void AddWindowsStat(Ydb::Topic::MultipleWindowsStat *stat, ui64 perMin, ui64 perHour, ui64 perDay) { + stat->set_per_minute(stat->per_minute() + perMin); + stat->set_per_hour(stat->per_hour() + perHour); + stat->set_per_day(stat->per_day() + perDay); +} + +void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ctx); + + auto& record = ev->Get()->Record; + + std::map<ui32, NKikimrPQ::TStatusResponse::TPartResult> res; + + auto topicStats = Result.mutable_topic_stats(); + + if (record.PartResultSize() > 0) { // init with first value + + SetProtoTime(topicStats->mutable_min_last_write_time(), record.GetPartResult(0).GetLastWriteTimestampMs()); + SetProtoTime(topicStats->mutable_max_write_time_lag(), record.GetPartResult(0).GetWriteLagMs()); + } + + std::map<TString, Ydb::Topic::Consumer*> consumersInfo; + for (auto& consumer : *Result.mutable_consumers()) { + consumersInfo[NPersQueue::ConvertNewConsumerName(consumer.name(), ctx)] = &consumer; + } + + for (auto& partResult : record.GetPartResult()) { + res[partResult.GetPartition()] = partResult; + + topicStats->set_store_size_bytes(topicStats->store_size_bytes() + partResult.GetPartitionSize()); + + UpdateProtoTime(topicStats->mutable_min_last_write_time(), partResult.GetLastWriteTimestampMs(), true); + UpdateProtoTime(topicStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs(), false); + + AddWindowsStat(topicStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay()); + + + for (auto& cons : partResult.GetConsumerResult()) { + auto it = consumersInfo.find(cons.GetConsumer()); + if (it == consumersInfo.end()) continue; + + if (!it->second->has_consumer_stats()) { + auto* stats = it->second->mutable_consumer_stats(); + + SetProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs()); + SetProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs()); + SetProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs()); + } else { + auto* stats = it->second->mutable_consumer_stats(); + + UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs(), true); + UpdateProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs(), false); + UpdateProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs(), false); + } + + AddWindowsStat(it->second->mutable_consumer_stats()->mutable_bytes_read(), cons.GetAvgReadSpeedPerMin(), cons.GetAvgReadSpeedPerHour(), cons.GetAvgReadSpeedPerDay()); + } + } + + for (auto& partRes : *(Result.mutable_partitions())) { + auto it = res.find(partRes.partition_id()); + if (it == res.end()) continue; + + const auto& partResult = it->second; + auto partStats = partRes.mutable_partition_stats(); + + partStats->set_store_size_bytes(partResult.GetPartitionSize()); + partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset()); + partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset()); + + SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs()); + SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs()); + + AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay()); + + partStats->set_partition_node_id(tabletInfo.NodeId); } } +void TDescribeTopicActor::Reply(const TActorContext& ctx) { + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); +} + +void TDescribeConsumerActor::Reply(const TActorContext& ctx) { + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); +} + + +void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ctx); + Y_UNUSED(tabletInfo); + + std::map<ui32, NKikimrPQ::TReadSessionsInfoResponse::TPartitionInfo> res; + + for (const auto& partInfo : ev->Get()->Record.GetPartitionInfo()) { + res[partInfo.GetPartition()] = partInfo; + } + for (auto& partRes : *(Result.mutable_partitions())) { + auto it = res.find(partRes.partition_id()); + if (it == res.end()) continue; + auto consRes = partRes.mutable_partition_consumer_stats(); + consRes->set_read_session_id(it->second.GetSession()); + SetProtoTime(consRes->mutable_partition_read_session_create_time(), it->second.GetTimestampMs()); + consRes->set_connection_node_id(it->second.GetProxyNodeId()); + consRes->set_reader_name(it->second.GetClientNode()); + } +} + + +void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ctx); + Y_UNUSED(tabletInfo); + + auto& record = ev->Get()->Record; + + std::map<ui32, NKikimrPQ::TStatusResponse::TPartResult> res; + + for (auto& partResult : record.GetPartResult()) { + res[partResult.GetPartition()] = partResult; + } + + for (auto& partRes : *(Result.mutable_partitions())) { + auto it = res.find(partRes.partition_id()); + if (it == res.end()) continue; + + const auto& partResult = it->second; + auto partStats = partRes.mutable_partition_stats(); + + partStats->set_store_size_bytes(partResult.GetPartitionSize()); + partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset()); + partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset()); + + SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs()); + SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs()); + + + AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay()); + + partStats->set_partition_node_id(tabletInfo.NodeId); + + if (Consumer) { + auto consStats = partRes.mutable_partition_consumer_stats(); + + consStats->set_last_read_offset(partResult.GetLagsInfo().GetReadPosition().GetOffset()); + consStats->set_committed_offset(partResult.GetLagsInfo().GetWritePosition().GetOffset()); + + SetProtoTime(consStats->mutable_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs()); + SetProtoTime(consStats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs()); + SetProtoTime(consStats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs()); + + AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay()); + + if (!Result.consumer().has_consumer_stats()) { + auto* stats = Result.mutable_consumer()->mutable_consumer_stats(); + + SetProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs()); + SetProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs()); + SetProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs()); + + AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay()); + } else { + auto* stats = Result.mutable_consumer()->mutable_consumer_stats(); + + UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs(), true); + UpdateProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs(), false); + UpdateProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs(), false); + + AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay()); + } + } + } +} + + + +bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i, + const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error) +{ + const auto &partConfig = config.GetPartitionConfig(); + const auto& pqConfig = AppData(ctx)->PQConfig; + + auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + rr->set_name(consumerName); + rr->mutable_read_from()->set_seconds(config.GetReadFromTimestampsMs(i) / 1000); + auto version = config.GetReadRuleVersions(i); + if (version != 0) + (*rr->mutable_attributes())["_version"] = TStringBuilder() << version; + for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { + rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); + } + bool important = false; + for (const auto &c : partConfig.GetImportantClientId()) { + if (c == config.GetReadRules(i)) { + important = true; + break; + } + } + rr->set_important(important); + TString serviceType = ""; + if (i < config.ReadRuleServiceTypesSize()) { + serviceType = config.GetReadRuleServiceTypes(i); + } else { + if (pqConfig.GetDisallowDefaultClientServiceType()) { + error = "service type must be set for all read rules"; + status = Ydb::StatusIds::INTERNAL_ERROR; + return false; + } + serviceType = pqConfig.GetDefaultClientServiceType().GetName(); + } + (*rr->mutable_attributes())["_service_type"] = serviceType; + return true; +} + void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic if (ReplyIfNotTopic(ev, ctx)) { @@ -459,9 +851,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv const TString path = JoinSeq("/", response.Path); - Ydb::Topic::DescribeTopicResult result; - - Ydb::Scheme::Entry *selfEntry = result.mutable_self(); + Ydb::Scheme::Entry *selfEntry = Result.mutable_self(); ConvertDirectoryEntry(response.Self->Info, selfEntry, true); if (const auto& name = GetCdcStreamName()) { selfEntry->set_name(*name); @@ -469,109 +859,171 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv if (response.PQGroupInfo) { const auto &pqDescr = response.PQGroupInfo->Description; - result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount()); + Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount()); for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) { - auto part = result.add_partitions(); + auto part = Result.add_partitions(); part->set_partition_id(i); part->set_active(true); } const auto &config = pqDescr.GetPQTabletConfig(); if (!config.GetRequireAuthWrite()) { - (*result.mutable_attributes())["_allow_unauthenticated_write"] = "true"; + (*Result.mutable_attributes())["_allow_unauthenticated_write"] = "true"; } if (!config.GetRequireAuthRead()) { - (*result.mutable_attributes())["_allow_unauthenticated_read"] = "true"; + (*Result.mutable_attributes())["_allow_unauthenticated_read"] = "true"; } if (pqDescr.GetPartitionPerTablet() != 2) { - (*result.mutable_attributes())["_partitions_per_tablet"] = + (*Result.mutable_attributes())["_partitions_per_tablet"] = TStringBuilder() << pqDescr.GetPartitionPerTablet(); } if (config.HasAbcId()) { - (*result.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId(); + (*Result.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId(); } if (config.HasAbcSlug()) { - (*result.mutable_attributes())["_abc_slug"] = config.GetAbcSlug(); + (*Result.mutable_attributes())["_abc_slug"] = config.GetAbcSlug(); } if (config.HasFederationAccount()) { - (*result.mutable_attributes())["_federation_account"] = config.GetFederationAccount(); + (*Result.mutable_attributes())["_federation_account"] = config.GetFederationAccount(); } bool local = config.GetLocalDC(); const auto &partConfig = config.GetPartitionConfig(); i64 msip = partConfig.GetMaxSizeInPartition(); - if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) - (*result.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip ; - result.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds()); - result.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024); - (*result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000); - (*result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts(); + if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) { + (*Result.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip; + } + Result.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds()); + Result.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024); + (*Result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000); + (*Result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts(); const auto& pqConfig = AppData(ctx)->PQConfig; if (local || pqConfig.GetTopicsAreFirstClassCitizen()) { - result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond()); - result.set_partition_write_burst_bytes(partConfig.GetBurstSize()); + Result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond()); + Result.set_partition_write_burst_bytes(partConfig.GetBurstSize()); } for (const auto &codec : config.GetCodecs().GetIds()) { - result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1)); + Result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1)); } if (pqConfig.GetBillingMeteringConfig().GetEnabled()) { switch (config.GetMeteringMode()) { case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY: - result.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY); + Result.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY); break; case NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS: - result.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS); + Result.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS); break; default: break; } } - + auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx); + bool found = false; for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { - auto rr = result.add_consumers(); - auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); - rr->set_name(consumerName); - rr->mutable_read_from()->set_seconds(config.GetReadFromTimestampsMs(i) / 1000); - auto version = config.GetReadRuleVersions(i); - if (version != 0) - (*rr->mutable_attributes())["_version"] = TStringBuilder() << version; - for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) { - rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1)); + if (consumerName == config.GetReadRules(i)) found = true; + auto rr = Result.add_consumers(); + Ydb::StatusIds::StatusCode status; + TString error; + if (!FillConsumerProto(rr, config, i, ctx, status, error)) { + return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx); } - bool important = false; - for (const auto &c : partConfig.GetImportantClientId()) { - if (c == config.GetReadRules(i)) { - important = true; - break; - } + } + + if (GetProtoRequest()->include_stats()) { + if (Consumer && !found) { + Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::ERROR)); + return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx); } - rr->set_important(important); - TString serviceType = ""; - if (i < config.ReadRuleServiceTypesSize()) { - serviceType = config.GetReadRuleServiceTypes(i); - } else { - if (pqConfig.GetDisallowDefaultClientServiceType()) { - this->Request_->RaiseIssue(FillIssue( - "service type must be set for all read rules", - Ydb::PersQueue::ErrorCode::ERROR - )); - Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); - return; - } - serviceType = pqConfig.GetDefaultClientServiceType().GetName(); + + ProcessTablets(pqDescr, ctx); + return; + } + } + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); +} + +void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic + if (ReplyIfNotTopic(ev, ctx)) { + return; + } + const auto& response = ev->Get()->Request.Get()->ResultSet.front(); + + const TString path = JoinSeq("/", response.Path); + + Ydb::Scheme::Entry *selfEntry = Result.mutable_self(); + ConvertDirectoryEntry(response.Self->Info, selfEntry, true); + //TODO: change entry + if (const auto& name = GetCdcStreamName()) { + selfEntry->set_name(*name); + } + selfEntry->set_name(selfEntry->name() + "/" + Consumer); + + if (response.PQGroupInfo) { + const auto& pqDescr = response.PQGroupInfo->Description; + const auto& config = pqDescr.GetPQTabletConfig(); + + for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) { + auto part = Result.add_partitions(); + part->set_partition_id(i); + part->set_active(true); + } + + auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx); + bool found = false; + for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { + if (consumerName != config.GetReadRules(i)) + continue; + found = true; + auto rr = Result.mutable_consumer(); + Ydb::StatusIds::StatusCode status; + TString error; + if (!FillConsumerProto(rr, config, i, ctx, status, error)) { + return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx); } - (*rr->mutable_attributes())["_service_type"] = serviceType; + break; + } + if (!found) { + Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::ERROR)); + return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + if (GetProtoRequest()->include_stats()) { + ProcessTablets(pqDescr, ctx); + return; } } - return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); + + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); } +bool TDescribeTopicActorImpl::ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx) { + for (ui32 i = 0; i < pqDescr.PartitionsSize(); ++i) { + const auto& pi = pqDescr.GetPartitions(i); + Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId()); + Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId(); + } + for (auto& pair : Tablets) { + RequestTablet(pair.second, ctx); + } + if (!Consumer.empty()) { + BalancerTabletId = pqDescr.GetBalancerTabletID(); + Tablets[BalancerTabletId].TabletId = BalancerTabletId; + } + + if (RequestsInfly == 0) { + Reply(ctx); + return false; + } + return true; +} + void TDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); @@ -580,7 +1032,12 @@ void TDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx) Become(&TDescribeTopicActor::StateWork); } +void TDescribeConsumerActor::Bootstrap(const NActors::TActorContext& ctx) +{ + TBase::Bootstrap(ctx); - + SendDescribeProposeRequest(ctx); + Become(&TDescribeConsumerActor::StateWork); +} } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index 40dbe350a0d..e3834c298b6 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -1,7 +1,8 @@ #pragma once +#include "events.h" #include <ydb/services/lib/actors/pq_schema_actor.h> - +#include <ydb/core/persqueue/events/global.h> namespace NKikimr::NGRpcProxy::V1 { using namespace NKikimr::NGRpcService; @@ -40,7 +41,6 @@ public: void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); } }; - class TPQDescribeTopicActor : public TPQGrpcSchemaBase<TPQDescribeTopicActor, NKikimr::NGRpcService::TEvPQDescribeTopicRequest> , public TCdcStreamCompatible { @@ -57,22 +57,109 @@ public: void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); }; +class TDescribeTopicActorImpl +{ +protected: + struct TTabletInfo { + ui64 TabletId; + std::vector<ui32> Partitions; + TActorId Pipe; + ui32 NodeId = 0; + ui32 RetriesLeft = 3; + }; +public: + TDescribeTopicActorImpl(const TString& consumer); + virtual ~TDescribeTopicActorImpl() = default; + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + + void Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx); + void Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx); + + void Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx); + + bool ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& description, const TActorContext& ctx); + + void RequestTablet(TTabletInfo& tablet, const TActorContext& ctx); + void RequestTablet(ui64 tabletId, const TActorContext& ctx); + void RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe = {}, const TDuration& delay = TDuration::Zero()); + void RequestAdditionalInfo(const TActorContext& ctx); + + bool StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); + + void Bootstrap(const NActors::TActorContext& ctx); + + virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) = 0; + + virtual void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) = 0; + virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) = 0; + virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) = 0; + virtual void Reply(const TActorContext& ctx) = 0; + +private: + + std::map<ui64, TTabletInfo> Tablets; + ui32 RequestsInfly = 0; + + ui64 BalancerTabletId; + +protected: + TString Consumer; +}; + class TDescribeTopicActor : public TPQGrpcSchemaBase<TDescribeTopicActor, NKikimr::NGRpcService::TEvDescribeTopicRequest> , public TCdcStreamCompatible + , public TDescribeTopicActorImpl { -using TBase = TPQGrpcSchemaBase<TDescribeTopicActor, TEvDescribeTopicRequest>; +using TBase = TPQGrpcSchemaBase<TDescribeTopicActor, NKikimr::NGRpcService::TEvDescribeTopicRequest>; +using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo; public: TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request); ~TDescribeTopicActor() = default; + void Bootstrap(const NActors::TActorContext& ctx); + void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override; + void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override; + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override; + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override; + virtual void Reply(const TActorContext& ctx) override; + +private: + Ydb::Topic::DescribeTopicResult Result; +}; + +class TDescribeConsumerActor : public TPQGrpcSchemaBase<TDescribeConsumerActor, NKikimr::NGRpcService::TEvDescribeConsumerRequest> + , public TCdcStreamCompatible + , public TDescribeTopicActorImpl +{ +using TBase = TPQGrpcSchemaBase<TDescribeConsumerActor, NKikimr::NGRpcService::TEvDescribeConsumerRequest>; +using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo; + +public: + TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request); + ~TDescribeConsumerActor() = default; + void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); + + void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override; + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override; + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override; + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override; + virtual void Reply(const TActorContext& ctx) override; + +private: + Ydb::Topic::DescribeConsumerResult Result; }; + + class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest> , public TCdcStreamCompatible { diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp index 1c52b7149bb..b116f03c012 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp @@ -134,7 +134,10 @@ void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TP ctx.Register(new TDescribeTopicActor(ev->Release().Release())); } - +void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe consumer request"); + ctx.Register(new TDescribeConsumerActor(ev->Release().Release())); +} } @@ -171,6 +174,9 @@ void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEv ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } +void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) { + ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); +} void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h index 899cef430e5..58ba33eb587 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.h +++ b/ydb/services/persqueue_v1/grpc_pq_schema.h @@ -41,7 +41,7 @@ private: HFunc(NKikimr::NGRpcService::TEvCreateTopicRequest, Handle); HFunc(NKikimr::NGRpcService::TEvAlterTopicRequest, Handle); HFunc(NKikimr::NGRpcService::TEvDescribeTopicRequest, Handle); - + HFunc(NKikimr::NGRpcService::TEvDescribeConsumerRequest, Handle); hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); } } @@ -57,6 +57,7 @@ private: void Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); + void Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 5e9d38d837f..ce18df18f47 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -3988,7 +3988,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { alter(request, Ydb::StatusIds::SUCCESS, false); TString topic4 = "rt3.dc1--acc--topic4"; - server.AnnoyingClient->CreateTopic(topic4, 1); //ensure creation + server.AnnoyingClient->CreateTopic(topic4, 3); //ensure creation auto res = server.AnnoyingClient->DescribeTopic({topic3}); Cerr << res.DebugString(); TString resultDescribe = R"___(TopicInfo { @@ -4121,6 +4121,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::DescribeTopicRequest request; Ydb::Topic::DescribeTopicResponse response; request.set_path(TStringBuilder() << "/Root/PQ/" << topic3); + grpc::ClientContext rcontext; auto status = TopicStubP_->DescribeTopic(&rcontext, request, &response); @@ -4129,6 +4130,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { Ydb::Topic::DescribeTopicResult res; response.operation().result().UnpackTo(&res); + Cerr << response.DebugString() << "\n" << res.DebugString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); res1 = res; } @@ -4148,10 +4151,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(status.ok()); Ydb::Topic::DescribeTopicResult descrRes; response.operation().result().UnpackTo(&descrRes); - Cerr << response << "\n" << descrRes << "\n"; + Cerr << response.DebugString() << "\n" << descrRes.DebugString() << "\n"; UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res1.DebugString()); + UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res1.DebugString()); { NYdb::TDriverConfig driverCfg; @@ -4221,6 +4224,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } } + { Ydb::Topic::DropTopicRequest request; Ydb::Topic::DropTopicResponse response; @@ -4236,7 +4240,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { server.AnnoyingClient->RemoveTopic(topic3); } - { Ydb::Topic::DropTopicRequest request; Ydb::Topic::DropTopicResponse response; @@ -4266,6 +4269,146 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { res.Wait(); Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; } + + for (ui32 i = 0; i < 5; ++ i) { + auto writer = CreateWriter(*driver, "acc/topic4", TStringBuilder() << "abacaba" << i); + auto ev = writer->GetEvent(true); + auto ct = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*ev); + UNIT_ASSERT(ct); + writer->Write(std::move(ct->ContinuationToken), "1234567890"); + UNIT_ASSERT(ev.Defined()); + while(true) { + ev = writer->GetEvent(true); + auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*ev); + if (ack) { + break; + } + } + } + + { + Ydb::Topic::DescribeTopicRequest request; + Ydb::Topic::DescribeTopicResponse response; + request.set_path(TStringBuilder() << "/Root/PQ/" << topic4); + request.set_include_stats(true); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->DescribeTopic(&rcontext, request, &response); + + UNIT_ASSERT(status.ok()); + Ydb::Topic::DescribeTopicResult res; + response.operation().result().UnpackTo(&res); + + Cerr << response.DebugString() << "\n" << res.DebugString() << "\n"; + + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(res.topic_stats().store_size_bytes(), 800); + UNIT_ASSERT_GE(res.partitions(0).partition_stats().partition_offsets().end(), 1); + } + + auto reader1 = CreateReader( + *driver, + NYdb::NPersQueue::TReadSessionSettings() + .AppendTopics( + NYdb::NPersQueue::TTopicReadSettings("acc/topic4") + ) + .ConsumerName("shared/user") + .ReadOnlyOriginal(true) + ); + int numLocks = 3; + while (numLocks > 0) { + auto msg = reader1->GetEvent(true, 1); + UNIT_ASSERT(msg); + + Cerr << "===Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + UNIT_ASSERT(ev); + --numLocks; + } + + auto reader2 = CreateReader( + *driver, + NYdb::NPersQueue::TReadSessionSettings() + .AppendTopics( + NYdb::NPersQueue::TTopicReadSettings("acc/topic4") + ) + .ConsumerName("shared/user") + .ReadOnlyOriginal(true) + ); + + numLocks = 1; + while (numLocks > 0) { + { + auto msg = reader1->GetEvent(true, 1); + UNIT_ASSERT(msg); + Cerr << "===Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*msg); + UNIT_ASSERT(ev); + ev->Confirm(); + } + { + auto msg = reader2->GetEvent(true, 1); + UNIT_ASSERT(msg); + + Cerr << "===Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n"; + + auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg); + UNIT_ASSERT(ev); + } + --numLocks; + } + + { + Ydb::Topic::DescribeConsumerRequest request; + Ydb::Topic::DescribeConsumerResponse response; + request.set_path(TStringBuilder() << "/Root/PQ/" << topic4); + request.set_consumer("user"); + request.set_include_stats(true); + grpc::ClientContext rcontext; + + auto status = TopicStubP_->DescribeConsumer(&rcontext, request, &response); + + UNIT_ASSERT(status.ok()); + Ydb::Topic::DescribeConsumerResult res; + response.operation().result().UnpackTo(&res); + + Cerr << "DESCRIBE CONSUMER RESULT:\n" << response << "\n" << res.DebugString() << "\n"; + +// UNIT_ASSERT_GE(res.partitions(0).partition_stats().partition_offsets().end(), 1); + //TODO: check here some stats from describe consumer + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(res.partitions_size(), 3); + UNIT_ASSERT(res.partitions(0).partition_consumer_stats().read_session_id().size() > 0); + UNIT_ASSERT(res.partitions(1).partition_consumer_stats().read_session_id().size() > 0); + UNIT_ASSERT(res.partitions(2).partition_consumer_stats().read_session_id().size() > 0); + + } + + { + Ydb::Topic::DescribeConsumerRequest request; + Ydb::Topic::DescribeConsumerResponse response; + request.set_path(TStringBuilder() << "/Root/PQ/" << topic4); + request.set_consumer("not-consumer"); + request.set_include_stats(true); + + grpc::ClientContext rcontext; + + auto status = TopicStubP_->DescribeConsumer(&rcontext, request, &response); + + Cerr << response << "\n" << res << "\n"; + + UNIT_ASSERT(status.ok()); + Ydb::Topic::DescribeConsumerResult res; + response.operation().result().UnpackTo(&res); + + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR); + } + + + } Y_UNIT_TEST(SchemeOperationFirstClassCitizen) { diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index a1979e328df..730b03dabe2 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -120,7 +120,9 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeTopicRequest(ctx, IsRlAllowed())); }) - + ADD_REQUEST(DescribeConsumer, TopicService, DescribeConsumerRequest, DescribeConsumerResponse, { + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed())); + }) #undef ADD_REQUEST } |