aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-11-28 16:51:29 +0300
committeralexnick <alexnick@ydb.tech>2022-11-28 16:51:29 +0300
commitbe7707450e36daf4e717fc52ecb58a870f45a705 (patch)
tree2a693fe67656884d1ef021ad5d482ba638683a5e
parent00e9a853d87f80d1c24085555b7d06cc56112d99 (diff)
downloadydb-be7707450e36daf4e717fc52ecb58a870f45a705.tar.gz
topic describe stats
progress progress progress progress progress progress progress progress proto
-rw-r--r--ydb/core/grpc_services/base/base.h1
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp2
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h1
-rw-r--r--ydb/core/grpc_services/rpc_calls.h1
-rw-r--r--ydb/core/persqueue/events/global.h10
-rw-r--r--ydb/core/persqueue/events/internal.h4
-rw-r--r--ydb/core/persqueue/partition.cpp16
-rw-r--r--ydb/core/persqueue/pq_impl.cpp3
-rw-r--r--ydb/core/persqueue/read_balancer.cpp2
-rw-r--r--ydb/core/protos/pqconfig.proto17
-rw-r--r--ydb/public/api/grpc/ydb_topic_v1.proto4
-rw-r--r--ydb/public/api/protos/ydb_scheme.proto2
-rw-r--r--ydb/public/api/protos/ydb_topic.proto132
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.txt1
-rw-r--r--ydb/services/persqueue_v1/actors/events.h8
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp6
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp571
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.h95
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.cpp8
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.h3
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp151
-rw-r--r--ydb/services/persqueue_v1/topic.cpp4
23 files changed, 966 insertions, 78 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index ab69d7beb16..722ed130213 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -133,6 +133,7 @@ struct TRpcServices {
EvCreateTopic,
EvAlterTopic,
EvDescribeTopic,
+ EvDescribeConsumer,
EvGetDiskSpaceUsage,
EvStopServingDatabase,
EvCoordinationSession,
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 2cacc71877d..2976a3dd594 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -541,8 +541,8 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvCreateTopicRequest, PreHandle);
HFunc(TEvAlterTopicRequest, PreHandle);
HFunc(TEvDescribeTopicRequest, PreHandle);
+ HFunc(TEvDescribeConsumerRequest, PreHandle);
HFunc(TEvNodeCheckRequest, PreHandle);
-
HFunc(TEvProxyRuntimeEvent, PreHandle);
default:
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index c61c15fb7aa..4ff52e8e3a8 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -71,6 +71,7 @@ protected:
void Handle(TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
TActorId DiscoveryCacheActorID;
};
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 7614381de74..b8f318c4561 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -68,6 +68,7 @@ using TEvDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDropTo
using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTopic, Ydb::Topic::CreateTopicRequest, Ydb::Topic::CreateTopicResponse, true, TRateLimiterMode::Rps>;
using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>;
using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>;
+using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>;
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index 5ea9600c343..15a7964fd85 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -81,7 +81,11 @@ struct TEvPersQueue {
struct TEvGetReadSessionsInfo: public TEventPB<TEvGetReadSessionsInfo,
NKikimrPQ::TGetReadSessionsInfo, EvGetReadSessionsInfo> {
- TEvGetReadSessionsInfo() {}
+ TEvGetReadSessionsInfo(const TString& consumer = "") {
+ if (!consumer.empty()) {
+ Record.SetClientId(consumer);
+ }
+ }
};
struct TEvReadSessionsInfoResponse: public TEventPB<TEvReadSessionsInfoResponse,
@@ -125,9 +129,11 @@ struct TEvPersQueue {
struct TEvStatus : public TEventPB<TEvStatus,
NKikimrPQ::TStatus, EvStatus> {
- explicit TEvStatus(const TString& consumer = "") {
+ explicit TEvStatus(const TString& consumer = "", bool getStatForAllConsumers = false) {
if (!consumer.empty())
Record.SetClientId(consumer);
+ if (getStatForAllConsumers)
+ Record.SetGetStatForAllConsumers(true);
}
};
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index afc8bf66514..6d60b347d52 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -310,13 +310,15 @@ struct TEvPQ {
};
struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> {
- explicit TEvPartitionStatus(const TActorId& sender, const TString& clientId)
+ explicit TEvPartitionStatus(const TActorId& sender, const TString& clientId, bool getStatForAllConsumers)
: Sender(sender)
, ClientId(clientId)
+ , GetStatForAllConsumers(getStatForAllConsumers)
{}
TActorId Sender;
TString ClientId;
+ bool GetStatForAllConsumers;
};
struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> {
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index eca824c0dd4..1e203bce1f1 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2094,6 +2094,22 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
ui64 totalLag = clientInfo->GetReadLagMs() + userInfo.GetWriteLagMs() + (ctx.Now() - userInfo.GetReadTimestamp()).MilliSeconds();
clientInfo->SetTotalLagMs(totalLag);
}
+
+ if (ev->Get()->GetStatForAllConsumers) { //fill lags
+ auto* clientInfo = result.AddConsumerResult();
+ clientInfo->SetConsumer(userInfo.User);
+ auto readTimestamp = (userInfo.GetReadWriteTimestamp() ? userInfo.GetReadWriteTimestamp() : GetWriteTimeEstimate(userInfo.GetReadOffset())).MilliSeconds();
+ clientInfo->SetReadLagMs(userInfo.GetReadOffset() < (i64)EndOffset
+ ? (userInfo.GetReadTimestamp() - TInstant::MilliSeconds(readTimestamp)).MilliSeconds()
+ : 0);
+ clientInfo->SetLastReadTimestampMs(userInfo.GetReadTimestamp().MilliSeconds());
+ clientInfo->SetWriteLagMs(userInfo.GetWriteLagMs());
+
+ clientInfo->SetAvgReadSpeedPerMin(userInfo.AvgReadBytes[1].GetValue());
+ clientInfo->SetAvgReadSpeedPerHour(userInfo.AvgReadBytes[2].GetValue());
+ clientInfo->SetAvgReadSpeedPerDay(userInfo.AvgReadBytes[3].GetValue());
+ }
+
}
result.SetAvgReadSpeedPerSec(resSpeed[0]);
result.SetAvgReadSpeedPerMin(resSpeed[1]);
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 8ea588c752d..d78a6cc03b7 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1422,7 +1422,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
for (auto& p : Partitions) {
if (!p.second.InitDone)
continue;
- THolder<TEvPQ::TEvPartitionStatus> event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "");
+ THolder<TEvPQ::TEvPartitionStatus> event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "",
+ ev->Get()->Record.HasGetStatForAllConsumers() ? ev->Get()->Record.GetGetStatForAllConsumers() : false);
ctx.Send(p.second.Actor, event.Release());
}
}
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 7b183f066b1..89676eef2ea 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -904,11 +904,13 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&
pi->SetProxyNodeId(jt->second.ProxyNodeId);
pi->SetSession(jt->second.Session);
pi->SetTimestamp(jt->second.Timestamp.Seconds());
+ pi->SetTimestampMs(jt->second.Timestamp.MilliSeconds());
} else {
pi->SetClientNode("");
pi->SetProxyNodeId(0);
pi->SetSession("");
pi->SetTimestamp(0);
+ pi->SetTimestampMs(0);
}
}
for (auto& s : c.second.SessionsInfo) {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 0e3b900812f..2345a6b759b 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -470,7 +470,6 @@ message TReadSessionStatusResponse {
optional string ClientNode = 6;
optional uint32 ProxyNodeId = 7;
-
}
@@ -481,6 +480,7 @@ message TReadSessionsInfoResponse {
optional uint32 ProxyNodeId = 3;
optional string Session = 4;
optional uint64 Timestamp = 5;
+ optional uint64 TimestampMs = 6;
}
repeated TPartitionInfo PartitionInfo = 1;
optional uint64 TabletId = 2;
@@ -587,6 +587,7 @@ message TOffsetsResponse {
message TStatus {
optional string ClientId = 1;
+ optional bool GetStatForAllConsumers = 2;
}
message TClientPosition {
@@ -662,6 +663,20 @@ message TStatusResponse {
optional int64 SourceIdRetentionPeriodSec = 28;
repeated TErrorMessage Errors = 29;
+
+ repeated TConsumerResult ConsumerResult = 30;
+ }
+
+ message TConsumerResult {
+ optional string Consumer = 1;
+
+ optional int64 AvgReadSpeedPerMin = 2;
+ optional int64 AvgReadSpeedPerHour = 3;
+ optional int64 AvgReadSpeedPerDay = 4;
+
+ optional uint64 WriteLagMs = 5;
+ optional uint64 ReadLagMs = 6;
+ optional uint64 LastReadTimestampMs = 7;
}
optional uint64 TabletId = 1;
diff --git a/ydb/public/api/grpc/ydb_topic_v1.proto b/ydb/public/api/grpc/ydb_topic_v1.proto
index d119b3b4594..a65012e4148 100644
--- a/ydb/public/api/grpc/ydb_topic_v1.proto
+++ b/ydb/public/api/grpc/ydb_topic_v1.proto
@@ -69,15 +69,15 @@ service TopicService {
// Create topic command.
rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse);
-
// Describe topic command.
rpc DescribeTopic(DescribeTopicRequest) returns (DescribeTopicResponse);
+ // Describe topic's consumer command.
+ rpc DescribeConsumer(DescribeConsumerRequest) returns (DescribeConsumerResponse);
// Alter topic command.
rpc AlterTopic(AlterTopicRequest) returns (AlterTopicResponse);
-
// Drop topic command.
rpc DropTopic(DropTopicRequest) returns (DropTopicResponse);
}
diff --git a/ydb/public/api/protos/ydb_scheme.proto b/ydb/public/api/protos/ydb_scheme.proto
index 04f717b6dfd..54b05f87e7a 100644
--- a/ydb/public/api/protos/ydb_scheme.proto
+++ b/ydb/public/api/protos/ydb_scheme.proto
@@ -8,6 +8,8 @@ option java_outer_classname = "SchemeOperationProtos";
import "ydb/public/api/protos/ydb_common.proto";
import "ydb/public/api/protos/ydb_operation.proto";
+import "google/protobuf/timestamp.proto";
+
// Create directory.
// All intermediate directories must be created
message MakeDirectoryRequest {
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index f310bf16eef..a5a6c5abde6 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -290,6 +290,8 @@ message StreamReadMessage {
repeated TopicReadSettings topics_read_settings = 1;
// Path of consumer that is used for reading by this session.
string consumer = 2;
+ // Optional name. Will be shown in debug stat.
+ string reader_name = 3;
message TopicReadSettings {
// Topic path.
@@ -541,6 +543,12 @@ message AddOffsetsToTransactionResult {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Control messages
+// message representing statistics by seleveral windows
+message MultipleWindowsStat {
+ int64 per_minute = 1;
+ int64 per_hour = 2;
+ int64 per_day = 3;
+}
// Consumer description.
message Consumer {
@@ -559,6 +567,20 @@ message Consumer {
// Attributes of consumer
map<string, string> attributes = 6;
+
+ // Filled only when requested statistics in Describe*Request.
+ ConsumerStats consumer_stats = 7;
+
+ message ConsumerStats {
+ // Minimal timestamp of last read from partitions.
+ google.protobuf.Timestamp min_partitions_last_read_time = 1;
+ // Maximum of differences between timestamp of read and write timestamp for all messages, read during last minute.
+ google.protobuf.Duration max_read_time_lag = 2;
+ // Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
+ google.protobuf.Duration max_write_time_lag = 3;
+ // Bytes read stastics.
+ MultipleWindowsStat bytes_read = 4;
+ }
}
// Consumer alter description.
@@ -670,6 +692,9 @@ message DescribeTopicRequest {
// Topic path.
string path = 2;
+
+ // Include topic statistics.
+ bool include_stats = 3;
}
// Describe topic response sent from server to client.
@@ -720,6 +745,9 @@ message DescribeTopicResult {
// Metering settings.
MeteringMode metering_mode = 12;
+ // Statistics of topic.
+ TopicStats topic_stats = 13;
+
message PartitionInfo {
// Partition identifier.
int64 partition_id = 1;
@@ -729,7 +757,111 @@ message DescribeTopicResult {
repeated int64 child_partition_ids = 3;
// Ids of partitions from which this partition was formed by split or merge.
repeated int64 parent_partition_ids = 4;
+
+ // Stats for partition, filled only when include_stats in request is true.
+ PartitionStats partition_stats = 5;
}
+
+ message TopicStats {
+ // Approximate size of topic.
+ int64 store_size_bytes = 1;
+
+ // Minimum of timestamps of last write among all partitions.
+ google.protobuf.Timestamp min_last_write_time = 2;
+ // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute.
+ google.protobuf.Duration max_write_time_lag = 3;
+ // How much bytes were written statistics.
+ MultipleWindowsStat bytes_written = 4;
+ }
+}
+
+
+// Describe topic's consumer request sent from client to server.
+message DescribeConsumerRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+
+ // Topic path.
+ string path = 2;
+ // Consumer name;
+ string consumer = 3;
+ // Include consumer statistics.
+ bool include_stats = 4;
+}
+
+// Describe topic's consumer response sent from server to client.
+// If topic is not existed then response status will be "SCHEME_ERROR".
+message DescribeConsumerResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Describe topic's consumer result message that will be inside DescribeConsumerResponse.operation.
+message DescribeConsumerResult {
+ // Description of scheme object.
+ Ydb.Scheme.Entry self = 1;
+
+ Consumer consumer = 2;
+
+ repeated PartitionInfo partitions = 3;
+
+ message PartitionInfo {
+ // Partition identifier.
+ int64 partition_id = 1;
+ // Is partition open for write.
+ bool active = 2;
+ // Ids of partitions which was formed when this partition was split or merged.
+ repeated int64 child_partition_ids = 3;
+ // Ids of partitions from which this partition was formed by split or merge.
+ repeated int64 parent_partition_ids = 4;
+
+ // Stats for partition, filled only when include_stats in request is true.
+ PartitionStats partition_stats = 5;
+ // Stats for consumer of this partition, filled only when include_stats in request is true.
+ PartitionConsumerStats partition_consumer_stats = 6;
+ }
+
+ message PartitionConsumerStats {
+ // Last read offset from this partition.
+ int64 last_read_offset = 1;
+ // Committed offset for this partition.
+ int64 committed_offset = 2;
+ // Reading this partition read session identifier.
+ string read_session_id = 3;
+
+ // Timestamp of providing this partition to this session by server.
+ google.protobuf.Timestamp partition_read_session_create_time = 4;
+
+ // Timestamp of last read from this partition.
+ google.protobuf.Timestamp last_read_time = 5;
+ // Maximum of differences between timestamp of read and write timestamp for all messages, read during last minute.
+ google.protobuf.Duration max_read_time_lag = 6;
+ // Maximum of differences between write timestamp and create timestamp for all messages, read during last minute.
+ google.protobuf.Duration max_write_time_lag = 7;
+
+ // How much bytes were read during several windows statistics from this partiton.
+ MultipleWindowsStat bytes_read = 8;
+
+ // Read session name, provided by client.
+ string reader_name = 11;
+ // Host where read session connected.
+ int32 connection_node_id = 12;
+ }
+}
+
+message PartitionStats {
+ // Partition contains messages with offsets in range [start, end).
+ OffsetsRange partition_offsets = 1;
+ // Approximate size of partition.
+ int64 store_size_bytes = 2;
+ // Timestamp of last write.
+ google.protobuf.Timestamp last_write_time = 3;
+ // Maximum of differences between write timestamp and create timestamp for all messages, written during last minute.
+ google.protobuf.Duration max_write_time_lag = 4;
+ // How much bytes were written during several windows in this partition.
+ MultipleWindowsStat bytes_written = 5;
+
+ // Host where tablet for this partition works. Useful for debugging purposes.
+ int32 partition_node_id = 8;
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index b3da17970f2..08fe0a71dbf 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -185,7 +185,7 @@ namespace NKikimr::NGRpcProxy::V1 {
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
this->Request_->RaiseIssue(
FillIssue(
- TStringBuilder() << "path '" << path << "' is not a stream",
+ TStringBuilder() << "path '" << path << "' is not a topic",
Ydb::PersQueue::ErrorCode::ERROR
)
);
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.txt b/ydb/services/persqueue_v1/actors/CMakeLists.txt
index 255fa7270e5..e65325c51d9 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.txt
@@ -16,6 +16,7 @@ target_link_libraries(services-persqueue_v1-actors PUBLIC
ydb-core-base
ydb-core-grpc_services
ydb-core-persqueue
+ core-persqueue-events
ydb-core-protos
ydb-core-scheme
core-tx-scheme_cache
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index f3d685fd438..536714d08cc 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -62,6 +62,7 @@ struct TEvPQProxy {
EvUpdateToken,
EvTopicUpdateToken,
EvCommitRange,
+ EvRequestTablet,
EvEnd
};
@@ -435,6 +436,13 @@ struct TEvPQProxy {
ui64 WriteTimestampEstimateMs;
bool Init;
};
+ struct TEvRequestTablet : public NActors::TEventLocal<TEvRequestTablet, EvRequestTablet> {
+ TEvRequestTablet(const ui64 tabletId)
+ : TabletId(tabletId)
+ { }
+
+ ui64 TabletId;
+ };
};
}
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 95b42062601..85a879c661f 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -617,6 +617,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
<< "_" << "v1";
CommitsDisabled = false;
+ PeerName = ev->Get()->PeerName;
+
if constexpr (UseMigrationProtocol) {
RangesMode = init.ranges_mode();
MaxReadMessagesCount = NormalizeMaxReadMessagesCount(init.read_params().max_read_messages_count());
@@ -631,6 +633,9 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
MaxTimeLagMs = 0; // max_lag per topic only
ReadTimestampMs = 0; // read_from per topic only
ReadOnlyLocal = true;
+ if (init.reader_name()) {
+ PeerName = init.reader_name();
+ }
}
if (MaxTimeLagMs < 0) {
@@ -641,7 +646,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "start_from_written_at_ms must be nonnegative number", ctx);
}
- PeerName = ev->Get()->PeerName;
auto getTopicPath = [](const auto& settings) {
if constexpr (UseMigrationProtocol) {
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index fe23b799d07..3fdfb0821f1 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -439,16 +439,408 @@ void TAlterTopicActor::ModifyPersqueueConfig(
TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request)
: TBase(request, request->GetProtoRequest()->path())
+ , TDescribeTopicActorImpl("")
{
}
-void TDescribeTopicActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request)
+ : TBase(request, request->GetProtoRequest()->path())
+ , TDescribeTopicActorImpl(request->GetProtoRequest()->consumer())
+{
+}
+
+TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TString& consumer)
+ : Consumer(consumer)
+{
+}
+
+
+bool TDescribeTopicActorImpl::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
switch (ev->GetTypeRewrite()) {
- default: TBase::StateWork(ev, ctx);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ HFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ HFunc(NKikimr::TEvPersQueue::TEvStatusResponse, Handle);
+ HFunc(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse, Handle);
+ default: return false;
+ }
+ return true;
+}
+
+void TDescribeTopicActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ if (!TDescribeTopicActorImpl::StateWork(ev, ctx)) {
+ TBase::StateWork(ev, ctx);
+ }
+}
+
+void TDescribeConsumerActor::StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
+ if (!TDescribeTopicActorImpl::StateWork(ev, ctx)) {
+ TBase::StateWork(ev, ctx);
+ }
+}
+
+
+void TDescribeTopicActorImpl::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ RestartTablet(ev->Get()->TabletId, ctx, ev->Sender);
+ } else {
+ auto it = Tablets.find(ev->Get()->TabletId);
+ if (it == Tablets.end()) return;
+ it->second.NodeId = ev->Get()->ServerId.NodeId();
+ }
+}
+
+void TDescribeTopicActorImpl::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ RestartTablet(ev->Get()->TabletId, ctx, ev->Sender);
+}
+
+void TDescribeTopicActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) {
+ this->Request_->RaiseIssue(FillIssue(error, errorCode));
+ TBase::Reply(status, ctx);
+}
+
+void TDescribeConsumerActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) {
+ this->Request_->RaiseIssue(FillIssue(error, errorCode));
+ TBase::Reply(status, ctx);
+}
+
+
+void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe, const TDuration& delay) {
+ auto it = Tablets.find(tabletId);
+ if (it == Tablets.end()) return;
+ if (pipe && pipe != it->second.Pipe) return;
+ if (--it->second.RetriesLeft == 0) {
+ return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx);
+ }
+ Y_VERIFY(RequestsInfly > 0);
+ --RequestsInfly;
+ if (delay == TDuration::Zero()) {
+ RequestTablet(it->second, ctx);
+ } else {
+ ++RequestsInfly;
+ ctx.Schedule(delay, new TEvPQProxy::TEvRequestTablet(tabletId));
+ }
+}
+
+void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx) {
+ --RequestsInfly;
+ auto it = Tablets.find(ev->Get()->TabletId);
+ if (it == Tablets.end()) return;
+ RequestTablet(it->second, ctx);
+}
+
+void TDescribeTopicActorImpl::RequestTablet(TTabletInfo& tablet, const TActorContext& ctx) {
+ tablet.Pipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tablet.TabletId, NTabletPipe::TClientConfig(NTabletPipe::TClientRetryPolicy::WithRetries())));
+
+ if (tablet.TabletId == BalancerTabletId) {
+ THolder<NKikimr::TEvPersQueue::TEvGetReadSessionsInfo> ev(new NKikimr::TEvPersQueue::TEvGetReadSessionsInfo(Consumer));
+ NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release());
+
+ } else {
+ THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Consumer), Consumer.empty()));
+ NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release());
+ }
+ ++RequestsInfly;
+}
+
+void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
+ auto it = Tablets.find(ev->Get()->Record.GetTabletId());
+ if (it == Tablets.end()) return;
+ --RequestsInfly;
+ NTabletPipe::CloseClient(ctx, it->second.Pipe);
+ it->second.Pipe = TActorId{};
+
+ auto& record = ev->Get()->Record;
+ for (auto& partResult : record.GetPartResult()) {
+ if (partResult.GetStatus() == NKikimrPQ::TStatusResponse::STATUS_INITIALIZING ||
+ partResult.GetStatus() == NKikimrPQ::TStatusResponse::STATUS_UNKNOWN) {
+ RestartTablet(record.GetTabletId(), ctx, {}, TDuration::MilliSeconds(100));
+ return;
+ }
+ }
+
+ ApplyResponse(it->second, ev, ctx);
+
+ if (RequestsInfly == 0) {
+ RequestAdditionalInfo(ctx);
+ if (RequestsInfly == 0) {
+ Reply(ctx);
+ }
+ }
+}
+
+
+void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) {
+ if (BalancerTabletId == 0)
+ return;
+ auto it = Tablets.find(BalancerTabletId);
+ Y_VERIFY(it != Tablets.end());
+ --RequestsInfly;
+ NTabletPipe::CloseClient(ctx, it->second.Pipe);
+ it->second.Pipe = TActorId{};
+ BalancerTabletId = 0;
+
+ ApplyResponse(it->second, ev, ctx);
+
+ if (RequestsInfly == 0) {
+ RequestAdditionalInfo(ctx);
+ if (RequestsInfly == 0) {
+ Reply(ctx);
+ }
+ }
+}
+
+
+void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) {
+ if (BalancerTabletId) {
+ RequestTablet(BalancerTabletId, ctx);
+ }
+}
+
+void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) {
+ auto it = Tablets.find(tabletId);
+ if (it != Tablets.end()) {
+ RequestTablet(it->second, ctx);
+ }
+}
+
+
+template<class T>
+void SetProtoTime(T* proto, const ui64 ms) {
+ proto->set_seconds(ms / 1000);
+ proto->set_nanos((ms % 1000) * 1'000'000);
+}
+
+template<class T>
+void UpdateProtoTime(T* proto, const ui64 ms, bool storeMin) {
+ ui64 storedMs = proto->seconds() * 1000 + proto->nanos() / 1'000'000;
+ if ((ms < storedMs) == storeMin) {
+ SetProtoTime(proto, ms);
+ }
+}
+
+
+void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+ Y_UNUSED(tabletInfo);
+ Y_UNUSED(ev);
+ Y_FAIL("");
+}
+
+
+void AddWindowsStat(Ydb::Topic::MultipleWindowsStat *stat, ui64 perMin, ui64 perHour, ui64 perDay) {
+ stat->set_per_minute(stat->per_minute() + perMin);
+ stat->set_per_hour(stat->per_hour() + perHour);
+ stat->set_per_day(stat->per_day() + perDay);
+}
+
+void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+
+ auto& record = ev->Get()->Record;
+
+ std::map<ui32, NKikimrPQ::TStatusResponse::TPartResult> res;
+
+ auto topicStats = Result.mutable_topic_stats();
+
+ if (record.PartResultSize() > 0) { // init with first value
+
+ SetProtoTime(topicStats->mutable_min_last_write_time(), record.GetPartResult(0).GetLastWriteTimestampMs());
+ SetProtoTime(topicStats->mutable_max_write_time_lag(), record.GetPartResult(0).GetWriteLagMs());
+ }
+
+ std::map<TString, Ydb::Topic::Consumer*> consumersInfo;
+ for (auto& consumer : *Result.mutable_consumers()) {
+ consumersInfo[NPersQueue::ConvertNewConsumerName(consumer.name(), ctx)] = &consumer;
+ }
+
+ for (auto& partResult : record.GetPartResult()) {
+ res[partResult.GetPartition()] = partResult;
+
+ topicStats->set_store_size_bytes(topicStats->store_size_bytes() + partResult.GetPartitionSize());
+
+ UpdateProtoTime(topicStats->mutable_min_last_write_time(), partResult.GetLastWriteTimestampMs(), true);
+ UpdateProtoTime(topicStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs(), false);
+
+ AddWindowsStat(topicStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay());
+
+
+ for (auto& cons : partResult.GetConsumerResult()) {
+ auto it = consumersInfo.find(cons.GetConsumer());
+ if (it == consumersInfo.end()) continue;
+
+ if (!it->second->has_consumer_stats()) {
+ auto* stats = it->second->mutable_consumer_stats();
+
+ SetProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs());
+ SetProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs());
+ SetProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs());
+ } else {
+ auto* stats = it->second->mutable_consumer_stats();
+
+ UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), cons.GetLastReadTimestampMs(), true);
+ UpdateProtoTime(stats->mutable_max_read_time_lag(), cons.GetReadLagMs(), false);
+ UpdateProtoTime(stats->mutable_max_write_time_lag(), cons.GetWriteLagMs(), false);
+ }
+
+ AddWindowsStat(it->second->mutable_consumer_stats()->mutable_bytes_read(), cons.GetAvgReadSpeedPerMin(), cons.GetAvgReadSpeedPerHour(), cons.GetAvgReadSpeedPerDay());
+ }
+ }
+
+ for (auto& partRes : *(Result.mutable_partitions())) {
+ auto it = res.find(partRes.partition_id());
+ if (it == res.end()) continue;
+
+ const auto& partResult = it->second;
+ auto partStats = partRes.mutable_partition_stats();
+
+ partStats->set_store_size_bytes(partResult.GetPartitionSize());
+ partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset());
+ partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset());
+
+ SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs());
+ SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs());
+
+ AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay());
+
+ partStats->set_partition_node_id(tabletInfo.NodeId);
}
}
+void TDescribeTopicActor::Reply(const TActorContext& ctx) {
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
+}
+
+void TDescribeConsumerActor::Reply(const TActorContext& ctx) {
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
+}
+
+
+void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+ Y_UNUSED(tabletInfo);
+
+ std::map<ui32, NKikimrPQ::TReadSessionsInfoResponse::TPartitionInfo> res;
+
+ for (const auto& partInfo : ev->Get()->Record.GetPartitionInfo()) {
+ res[partInfo.GetPartition()] = partInfo;
+ }
+ for (auto& partRes : *(Result.mutable_partitions())) {
+ auto it = res.find(partRes.partition_id());
+ if (it == res.end()) continue;
+ auto consRes = partRes.mutable_partition_consumer_stats();
+ consRes->set_read_session_id(it->second.GetSession());
+ SetProtoTime(consRes->mutable_partition_read_session_create_time(), it->second.GetTimestampMs());
+ consRes->set_connection_node_id(it->second.GetProxyNodeId());
+ consRes->set_reader_name(it->second.GetClientNode());
+ }
+}
+
+
+void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+ Y_UNUSED(tabletInfo);
+
+ auto& record = ev->Get()->Record;
+
+ std::map<ui32, NKikimrPQ::TStatusResponse::TPartResult> res;
+
+ for (auto& partResult : record.GetPartResult()) {
+ res[partResult.GetPartition()] = partResult;
+ }
+
+ for (auto& partRes : *(Result.mutable_partitions())) {
+ auto it = res.find(partRes.partition_id());
+ if (it == res.end()) continue;
+
+ const auto& partResult = it->second;
+ auto partStats = partRes.mutable_partition_stats();
+
+ partStats->set_store_size_bytes(partResult.GetPartitionSize());
+ partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset());
+ partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset());
+
+ SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs());
+ SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs());
+
+
+ AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay());
+
+ partStats->set_partition_node_id(tabletInfo.NodeId);
+
+ if (Consumer) {
+ auto consStats = partRes.mutable_partition_consumer_stats();
+
+ consStats->set_last_read_offset(partResult.GetLagsInfo().GetReadPosition().GetOffset());
+ consStats->set_committed_offset(partResult.GetLagsInfo().GetWritePosition().GetOffset());
+
+ SetProtoTime(consStats->mutable_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs());
+ SetProtoTime(consStats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs());
+ SetProtoTime(consStats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs());
+
+ AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay());
+
+ if (!Result.consumer().has_consumer_stats()) {
+ auto* stats = Result.mutable_consumer()->mutable_consumer_stats();
+
+ SetProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs());
+ SetProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs());
+ SetProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs());
+
+ AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay());
+ } else {
+ auto* stats = Result.mutable_consumer()->mutable_consumer_stats();
+
+ UpdateProtoTime(stats->mutable_min_partitions_last_read_time(), partResult.GetLagsInfo().GetLastReadTimestampMs(), true);
+ UpdateProtoTime(stats->mutable_max_read_time_lag(), partResult.GetLagsInfo().GetReadLagMs(), false);
+ UpdateProtoTime(stats->mutable_max_write_time_lag(), partResult.GetLagsInfo().GetWriteLagMs(), false);
+
+ AddWindowsStat(consStats->mutable_bytes_read(), partResult.GetAvgReadSpeedPerMin(), partResult.GetAvgReadSpeedPerHour(), partResult.GetAvgReadSpeedPerDay());
+ }
+ }
+ }
+}
+
+
+
+bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i,
+ const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error)
+{
+ const auto &partConfig = config.GetPartitionConfig();
+ const auto& pqConfig = AppData(ctx)->PQConfig;
+
+ auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx);
+ rr->set_name(consumerName);
+ rr->mutable_read_from()->set_seconds(config.GetReadFromTimestampsMs(i) / 1000);
+ auto version = config.GetReadRuleVersions(i);
+ if (version != 0)
+ (*rr->mutable_attributes())["_version"] = TStringBuilder() << version;
+ for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) {
+ rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
+ }
+ bool important = false;
+ for (const auto &c : partConfig.GetImportantClientId()) {
+ if (c == config.GetReadRules(i)) {
+ important = true;
+ break;
+ }
+ }
+ rr->set_important(important);
+ TString serviceType = "";
+ if (i < config.ReadRuleServiceTypesSize()) {
+ serviceType = config.GetReadRuleServiceTypes(i);
+ } else {
+ if (pqConfig.GetDisallowDefaultClientServiceType()) {
+ error = "service type must be set for all read rules";
+ status = Ydb::StatusIds::INTERNAL_ERROR;
+ return false;
+ }
+ serviceType = pqConfig.GetDefaultClientServiceType().GetName();
+ }
+ (*rr->mutable_attributes())["_service_type"] = serviceType;
+ return true;
+}
+
void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
if (ReplyIfNotTopic(ev, ctx)) {
@@ -459,9 +851,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
const TString path = JoinSeq("/", response.Path);
- Ydb::Topic::DescribeTopicResult result;
-
- Ydb::Scheme::Entry *selfEntry = result.mutable_self();
+ Ydb::Scheme::Entry *selfEntry = Result.mutable_self();
ConvertDirectoryEntry(response.Self->Info, selfEntry, true);
if (const auto& name = GetCdcStreamName()) {
selfEntry->set_name(*name);
@@ -469,109 +859,171 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
if (response.PQGroupInfo) {
const auto &pqDescr = response.PQGroupInfo->Description;
- result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());
+ Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());
for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
- auto part = result.add_partitions();
+ auto part = Result.add_partitions();
part->set_partition_id(i);
part->set_active(true);
}
const auto &config = pqDescr.GetPQTabletConfig();
if (!config.GetRequireAuthWrite()) {
- (*result.mutable_attributes())["_allow_unauthenticated_write"] = "true";
+ (*Result.mutable_attributes())["_allow_unauthenticated_write"] = "true";
}
if (!config.GetRequireAuthRead()) {
- (*result.mutable_attributes())["_allow_unauthenticated_read"] = "true";
+ (*Result.mutable_attributes())["_allow_unauthenticated_read"] = "true";
}
if (pqDescr.GetPartitionPerTablet() != 2) {
- (*result.mutable_attributes())["_partitions_per_tablet"] =
+ (*Result.mutable_attributes())["_partitions_per_tablet"] =
TStringBuilder() << pqDescr.GetPartitionPerTablet();
}
if (config.HasAbcId()) {
- (*result.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId();
+ (*Result.mutable_attributes())["_abc_id"] = TStringBuilder() << config.GetAbcId();
}
if (config.HasAbcSlug()) {
- (*result.mutable_attributes())["_abc_slug"] = config.GetAbcSlug();
+ (*Result.mutable_attributes())["_abc_slug"] = config.GetAbcSlug();
}
if (config.HasFederationAccount()) {
- (*result.mutable_attributes())["_federation_account"] = config.GetFederationAccount();
+ (*Result.mutable_attributes())["_federation_account"] = config.GetFederationAccount();
}
bool local = config.GetLocalDC();
const auto &partConfig = config.GetPartitionConfig();
i64 msip = partConfig.GetMaxSizeInPartition();
- if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>())
- (*result.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip ;
- result.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds());
- result.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024);
- (*result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
- (*result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();
+ if (partConfig.HasMaxSizeInPartition() && msip != Max<i64>()) {
+ (*Result.mutable_attributes())["_max_partition_storage_size"] = TStringBuilder() << msip;
+ }
+ Result.mutable_retention_period()->set_seconds(partConfig.GetLifetimeSeconds());
+ Result.set_retention_storage_mb(partConfig.GetStorageLimitBytes() / 1024 / 1024);
+ (*Result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
+ (*Result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();
const auto& pqConfig = AppData(ctx)->PQConfig;
if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
- result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
- result.set_partition_write_burst_bytes(partConfig.GetBurstSize());
+ Result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
+ Result.set_partition_write_burst_bytes(partConfig.GetBurstSize());
}
for (const auto &codec : config.GetCodecs().GetIds()) {
- result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
+ Result.mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec)(codec + 1));
}
if (pqConfig.GetBillingMeteringConfig().GetEnabled()) {
switch (config.GetMeteringMode()) {
case NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY:
- result.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
+ Result.set_metering_mode(Ydb::Topic::METERING_MODE_RESERVED_CAPACITY);
break;
case NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS:
- result.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
+ Result.set_metering_mode(Ydb::Topic::METERING_MODE_REQUEST_UNITS);
break;
default:
break;
}
}
-
+ auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx);
+ bool found = false;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
- auto rr = result.add_consumers();
- auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx);
- rr->set_name(consumerName);
- rr->mutable_read_from()->set_seconds(config.GetReadFromTimestampsMs(i) / 1000);
- auto version = config.GetReadRuleVersions(i);
- if (version != 0)
- (*rr->mutable_attributes())["_version"] = TStringBuilder() << version;
- for (const auto &codec : config.GetConsumerCodecs(i).GetIds()) {
- rr->mutable_supported_codecs()->add_codecs((Ydb::Topic::Codec) (codec + 1));
+ if (consumerName == config.GetReadRules(i)) found = true;
+ auto rr = Result.add_consumers();
+ Ydb::StatusIds::StatusCode status;
+ TString error;
+ if (!FillConsumerProto(rr, config, i, ctx, status, error)) {
+ return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx);
}
- bool important = false;
- for (const auto &c : partConfig.GetImportantClientId()) {
- if (c == config.GetReadRules(i)) {
- important = true;
- break;
- }
+ }
+
+ if (GetProtoRequest()->include_stats()) {
+ if (Consumer && !found) {
+ Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::ERROR));
+ return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx);
}
- rr->set_important(important);
- TString serviceType = "";
- if (i < config.ReadRuleServiceTypesSize()) {
- serviceType = config.GetReadRuleServiceTypes(i);
- } else {
- if (pqConfig.GetDisallowDefaultClientServiceType()) {
- this->Request_->RaiseIssue(FillIssue(
- "service type must be set for all read rules",
- Ydb::PersQueue::ErrorCode::ERROR
- ));
- Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
- return;
- }
- serviceType = pqConfig.GetDefaultClientServiceType().GetName();
+
+ ProcessTablets(pqDescr, ctx);
+ return;
+ }
+ }
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
+}
+
+void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
+ if (ReplyIfNotTopic(ev, ctx)) {
+ return;
+ }
+ const auto& response = ev->Get()->Request.Get()->ResultSet.front();
+
+ const TString path = JoinSeq("/", response.Path);
+
+ Ydb::Scheme::Entry *selfEntry = Result.mutable_self();
+ ConvertDirectoryEntry(response.Self->Info, selfEntry, true);
+ //TODO: change entry
+ if (const auto& name = GetCdcStreamName()) {
+ selfEntry->set_name(*name);
+ }
+ selfEntry->set_name(selfEntry->name() + "/" + Consumer);
+
+ if (response.PQGroupInfo) {
+ const auto& pqDescr = response.PQGroupInfo->Description;
+ const auto& config = pqDescr.GetPQTabletConfig();
+
+ for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
+ auto part = Result.add_partitions();
+ part->set_partition_id(i);
+ part->set_active(true);
+ }
+
+ auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx);
+ bool found = false;
+ for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
+ if (consumerName != config.GetReadRules(i))
+ continue;
+ found = true;
+ auto rr = Result.mutable_consumer();
+ Ydb::StatusIds::StatusCode status;
+ TString error;
+ if (!FillConsumerProto(rr, config, i, ctx, status, error)) {
+ return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx);
}
- (*rr->mutable_attributes())["_service_type"] = serviceType;
+ break;
+ }
+ if (!found) {
+ Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::ERROR));
+ return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ }
+
+ if (GetProtoRequest()->include_stats()) {
+ ProcessTablets(pqDescr, ctx);
+ return;
}
}
- return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx);
+
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}
+bool TDescribeTopicActorImpl::ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx) {
+ for (ui32 i = 0; i < pqDescr.PartitionsSize(); ++i) {
+ const auto& pi = pqDescr.GetPartitions(i);
+ Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId());
+ Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId();
+ }
+ for (auto& pair : Tablets) {
+ RequestTablet(pair.second, ctx);
+ }
+ if (!Consumer.empty()) {
+ BalancerTabletId = pqDescr.GetBalancerTabletID();
+ Tablets[BalancerTabletId].TabletId = BalancerTabletId;
+ }
+
+ if (RequestsInfly == 0) {
+ Reply(ctx);
+ return false;
+ }
+ return true;
+}
+
void TDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx)
{
TBase::Bootstrap(ctx);
@@ -580,7 +1032,12 @@ void TDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx)
Become(&TDescribeTopicActor::StateWork);
}
+void TDescribeConsumerActor::Bootstrap(const NActors::TActorContext& ctx)
+{
+ TBase::Bootstrap(ctx);
-
+ SendDescribeProposeRequest(ctx);
+ Become(&TDescribeConsumerActor::StateWork);
+}
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h
index 40dbe350a0d..e3834c298b6 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.h
+++ b/ydb/services/persqueue_v1/actors/schema_actors.h
@@ -1,7 +1,8 @@
#pragma once
+#include "events.h"
#include <ydb/services/lib/actors/pq_schema_actor.h>
-
+#include <ydb/core/persqueue/events/global.h>
namespace NKikimr::NGRpcProxy::V1 {
using namespace NKikimr::NGRpcService;
@@ -40,7 +41,6 @@ public:
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); }
};
-
class TPQDescribeTopicActor : public TPQGrpcSchemaBase<TPQDescribeTopicActor, NKikimr::NGRpcService::TEvPQDescribeTopicRequest>
, public TCdcStreamCompatible
{
@@ -57,22 +57,109 @@ public:
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
};
+class TDescribeTopicActorImpl
+{
+protected:
+ struct TTabletInfo {
+ ui64 TabletId;
+ std::vector<ui32> Partitions;
+ TActorId Pipe;
+ ui32 NodeId = 0;
+ ui32 RetriesLeft = 3;
+ };
+public:
+ TDescribeTopicActorImpl(const TString& consumer);
+ virtual ~TDescribeTopicActorImpl() = default;
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
+
+ void Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx);
+
+ void Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx);
+
+ bool ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& description, const TActorContext& ctx);
+
+ void RequestTablet(TTabletInfo& tablet, const TActorContext& ctx);
+ void RequestTablet(ui64 tabletId, const TActorContext& ctx);
+ void RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe = {}, const TDuration& delay = TDuration::Zero());
+ void RequestAdditionalInfo(const TActorContext& ctx);
+
+ bool StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+ virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) = 0;
+
+ virtual void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) = 0;
+ virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) = 0;
+ virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) = 0;
+ virtual void Reply(const TActorContext& ctx) = 0;
+
+private:
+
+ std::map<ui64, TTabletInfo> Tablets;
+ ui32 RequestsInfly = 0;
+
+ ui64 BalancerTabletId;
+
+protected:
+ TString Consumer;
+};
+
class TDescribeTopicActor : public TPQGrpcSchemaBase<TDescribeTopicActor, NKikimr::NGRpcService::TEvDescribeTopicRequest>
, public TCdcStreamCompatible
+ , public TDescribeTopicActorImpl
{
-using TBase = TPQGrpcSchemaBase<TDescribeTopicActor, TEvDescribeTopicRequest>;
+using TBase = TPQGrpcSchemaBase<TDescribeTopicActor, NKikimr::NGRpcService::TEvDescribeTopicRequest>;
+using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
public:
TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request);
~TDescribeTopicActor() = default;
+ void Bootstrap(const NActors::TActorContext& ctx);
+ void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override;
+
void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override;
+ void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
+ void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
+ virtual void Reply(const TActorContext& ctx) override;
+
+private:
+ Ydb::Topic::DescribeTopicResult Result;
+};
+
+class TDescribeConsumerActor : public TPQGrpcSchemaBase<TDescribeConsumerActor, NKikimr::NGRpcService::TEvDescribeConsumerRequest>
+ , public TCdcStreamCompatible
+ , public TDescribeTopicActorImpl
+{
+using TBase = TPQGrpcSchemaBase<TDescribeConsumerActor, NKikimr::NGRpcService::TEvDescribeConsumerRequest>;
+using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
+
+public:
+ TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request);
+ ~TDescribeConsumerActor() = default;
+
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
+
+ void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override;
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override;
+ void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
+ void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
+ virtual void Reply(const TActorContext& ctx) override;
+
+private:
+ Ydb::Topic::DescribeConsumerResult Result;
};
+
+
class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest>
, public TCdcStreamCompatible
{
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
index 1c52b7149bb..b116f03c012 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
@@ -134,7 +134,10 @@ void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TP
ctx.Register(new TDescribeTopicActor(ev->Release().Release()));
}
-
+void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe consumer request");
+ ctx.Register(new TDescribeConsumerActor(ev->Release().Release()));
+}
}
@@ -171,6 +174,9 @@ void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEv
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
+void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) {
+ ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+}
void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvPQAddReadRuleRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h
index 899cef430e5..58ba33eb587 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.h
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.h
@@ -41,7 +41,7 @@ private:
HFunc(NKikimr::NGRpcService::TEvCreateTopicRequest, Handle);
HFunc(NKikimr::NGRpcService::TEvAlterTopicRequest, Handle);
HFunc(NKikimr::NGRpcService::TEvDescribeTopicRequest, Handle);
-
+ HFunc(NKikimr::NGRpcService::TEvDescribeConsumerRequest, Handle);
hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
}
}
@@ -57,6 +57,7 @@ private:
void Handle(NKikimr::NGRpcService::TEvCreateTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 5e9d38d837f..ce18df18f47 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -3988,7 +3988,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
alter(request, Ydb::StatusIds::SUCCESS, false);
TString topic4 = "rt3.dc1--acc--topic4";
- server.AnnoyingClient->CreateTopic(topic4, 1); //ensure creation
+ server.AnnoyingClient->CreateTopic(topic4, 3); //ensure creation
auto res = server.AnnoyingClient->DescribeTopic({topic3});
Cerr << res.DebugString();
TString resultDescribe = R"___(TopicInfo {
@@ -4121,6 +4121,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::DescribeTopicRequest request;
Ydb::Topic::DescribeTopicResponse response;
request.set_path(TStringBuilder() << "/Root/PQ/" << topic3);
+
grpc::ClientContext rcontext;
auto status = TopicStubP_->DescribeTopic(&rcontext, request, &response);
@@ -4129,6 +4130,8 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
Ydb::Topic::DescribeTopicResult res;
response.operation().result().UnpackTo(&res);
+ Cerr << response.DebugString() << "\n" << res.DebugString() << "\n";
+
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
res1 = res;
}
@@ -4148,10 +4151,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(status.ok());
Ydb::Topic::DescribeTopicResult descrRes;
response.operation().result().UnpackTo(&descrRes);
- Cerr << response << "\n" << descrRes << "\n";
+ Cerr << response.DebugString() << "\n" << descrRes.DebugString() << "\n";
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res1.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res1.DebugString());
{
NYdb::TDriverConfig driverCfg;
@@ -4221,6 +4224,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}
+
{
Ydb::Topic::DropTopicRequest request;
Ydb::Topic::DropTopicResponse response;
@@ -4236,7 +4240,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
server.AnnoyingClient->RemoveTopic(topic3);
}
-
{
Ydb::Topic::DropTopicRequest request;
Ydb::Topic::DropTopicResponse response;
@@ -4266,6 +4269,146 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
res.Wait();
Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
}
+
+ for (ui32 i = 0; i < 5; ++ i) {
+ auto writer = CreateWriter(*driver, "acc/topic4", TStringBuilder() << "abacaba" << i);
+ auto ev = writer->GetEvent(true);
+ auto ct = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TReadyToAcceptEvent>(&*ev);
+ UNIT_ASSERT(ct);
+ writer->Write(std::move(ct->ContinuationToken), "1234567890");
+ UNIT_ASSERT(ev.Defined());
+ while(true) {
+ ev = writer->GetEvent(true);
+ auto ack = std::get_if<NYdb::NPersQueue::TWriteSessionEvent::TAcksEvent>(&*ev);
+ if (ack) {
+ break;
+ }
+ }
+ }
+
+ {
+ Ydb::Topic::DescribeTopicRequest request;
+ Ydb::Topic::DescribeTopicResponse response;
+ request.set_path(TStringBuilder() << "/Root/PQ/" << topic4);
+ request.set_include_stats(true);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->DescribeTopic(&rcontext, request, &response);
+
+ UNIT_ASSERT(status.ok());
+ Ydb::Topic::DescribeTopicResult res;
+ response.operation().result().UnpackTo(&res);
+
+ Cerr << response.DebugString() << "\n" << res.DebugString() << "\n";
+
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(res.topic_stats().store_size_bytes(), 800);
+ UNIT_ASSERT_GE(res.partitions(0).partition_stats().partition_offsets().end(), 1);
+ }
+
+ auto reader1 = CreateReader(
+ *driver,
+ NYdb::NPersQueue::TReadSessionSettings()
+ .AppendTopics(
+ NYdb::NPersQueue::TTopicReadSettings("acc/topic4")
+ )
+ .ConsumerName("shared/user")
+ .ReadOnlyOriginal(true)
+ );
+ int numLocks = 3;
+ while (numLocks > 0) {
+ auto msg = reader1->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ Cerr << "===Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+ UNIT_ASSERT(ev);
+ --numLocks;
+ }
+
+ auto reader2 = CreateReader(
+ *driver,
+ NYdb::NPersQueue::TReadSessionSettings()
+ .AppendTopics(
+ NYdb::NPersQueue::TTopicReadSettings("acc/topic4")
+ )
+ .ConsumerName("shared/user")
+ .ReadOnlyOriginal(true)
+ );
+
+ numLocks = 1;
+ while (numLocks > 0) {
+ {
+ auto msg = reader1->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+ Cerr << "===Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDestroyPartitionStreamEvent>(&*msg);
+ UNIT_ASSERT(ev);
+ ev->Confirm();
+ }
+ {
+ auto msg = reader2->GetEvent(true, 1);
+ UNIT_ASSERT(msg);
+
+ Cerr << "===Got message: " << NYdb::NPersQueue::DebugString(*msg) << "\n";
+
+ auto ev = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TCreatePartitionStreamEvent>(&*msg);
+ UNIT_ASSERT(ev);
+ }
+ --numLocks;
+ }
+
+ {
+ Ydb::Topic::DescribeConsumerRequest request;
+ Ydb::Topic::DescribeConsumerResponse response;
+ request.set_path(TStringBuilder() << "/Root/PQ/" << topic4);
+ request.set_consumer("user");
+ request.set_include_stats(true);
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->DescribeConsumer(&rcontext, request, &response);
+
+ UNIT_ASSERT(status.ok());
+ Ydb::Topic::DescribeConsumerResult res;
+ response.operation().result().UnpackTo(&res);
+
+ Cerr << "DESCRIBE CONSUMER RESULT:\n" << response << "\n" << res.DebugString() << "\n";
+
+// UNIT_ASSERT_GE(res.partitions(0).partition_stats().partition_offsets().end(), 1);
+ //TODO: check here some stats from describe consumer
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(res.partitions_size(), 3);
+ UNIT_ASSERT(res.partitions(0).partition_consumer_stats().read_session_id().size() > 0);
+ UNIT_ASSERT(res.partitions(1).partition_consumer_stats().read_session_id().size() > 0);
+ UNIT_ASSERT(res.partitions(2).partition_consumer_stats().read_session_id().size() > 0);
+
+ }
+
+ {
+ Ydb::Topic::DescribeConsumerRequest request;
+ Ydb::Topic::DescribeConsumerResponse response;
+ request.set_path(TStringBuilder() << "/Root/PQ/" << topic4);
+ request.set_consumer("not-consumer");
+ request.set_include_stats(true);
+
+ grpc::ClientContext rcontext;
+
+ auto status = TopicStubP_->DescribeConsumer(&rcontext, request, &response);
+
+ Cerr << response << "\n" << res << "\n";
+
+ UNIT_ASSERT(status.ok());
+ Ydb::Topic::DescribeConsumerResult res;
+ response.operation().result().UnpackTo(&res);
+
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR);
+ }
+
+
+
}
Y_UNIT_TEST(SchemeOperationFirstClassCitizen) {
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index a1979e328df..730b03dabe2 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -120,7 +120,9 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
ADD_REQUEST(DescribeTopic, TopicService, DescribeTopicRequest, DescribeTopicResponse, {
ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeTopicRequest(ctx, IsRlAllowed()));
})
-
+ ADD_REQUEST(DescribeConsumer, TopicService, DescribeConsumerRequest, DescribeConsumerResponse, {
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed()));
+ })
#undef ADD_REQUEST
}