diff options
author | alexnick <alexnick@ydb.tech> | 2022-12-02 15:47:41 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-12-02 15:47:41 +0300 |
commit | 37960f794987f2412b440c0f8cd8ccb9c8fa7de4 (patch) | |
tree | 1d9615d352dd09458dc1485d949b4638c5118b3e | |
parent | ce0f540c0f9598245163d4826c6a2560cc17142e (diff) | |
download | ydb-37960f794987f2412b440c0f8cd8ccb9c8fa7de4.tar.gz |
support of stat in cpp sdk
progress
progress
progress
progress
provide codec in get records
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_proto/accessor.h | 1 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp | 87 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h | 38 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_topic/topic.h | 75 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 37 |
6 files changed, 239 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h index 756c6f792c..5ffcf39b29 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h @@ -40,6 +40,7 @@ public: static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats); static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription); static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription); + static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription); static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index f6abfca822..eddcade51c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -27,6 +27,16 @@ const TTopicDescription& TDescribeTopicResult::GetTopicDescription() const { return TopicDescription_; } +TDescribeConsumerResult::TDescribeConsumerResult(TStatus&& status, Ydb::Topic::DescribeConsumerResult&& result) + : TStatus(std::move(status)) + , ConsumerDescription_(std::move(result)) +{ +} + +const TConsumerDescription& TDescribeConsumerResult::GetConsumerDescription() const { + return ConsumerDescription_; +} + TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) : Proto_(std::move(result)) , PartitioningSettings_(Proto_.partitioning_settings()) @@ -54,6 +64,16 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result) } } +TConsumerDescription::TConsumerDescription(Ydb::Topic::DescribeConsumerResult&& result) + : Proto_(std::move(result)) + , Consumer_(result.consumer()) +{ + for (const auto& part : Proto_.partitions()) { + Partitions_.emplace_back(part); + } +} + + TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer) : ConsumerName_(consumer.name()) , Important_(consumer.important()) @@ -95,7 +115,11 @@ ui32 TTopicDescription::GetTotalPartitionsCount() const { return Partitions_.size(); } -const TVector<TPartitionInfo>& TTopicDescription::GetPartitions() const { +const TVector<TPartitionInfo>& TTopicDescription::GetPartitions() const { + return Partitions_; +} + +const TVector<TPartitionInfo>& TConsumerDescription::GetPartitions() const { return Partitions_; } @@ -140,6 +164,10 @@ const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const { return Proto_; } +const Ydb::Topic::DescribeConsumerResult& TConsumerDescription::GetProto() const { + return Proto_; +} + const TString& TTopicDescription::GetOwner() const { return Owner_; } @@ -165,9 +193,50 @@ ui64 TPartitioningSettings::GetPartitionCountLimit() const { return PartitionCountLimit_; } +TPartitionStats::TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats) + : StartOffset_(partitionStats.partition_offsets().start()) + , EndOffset_(partitionStats.partition_offsets().end()) +{} + +ui64 TPartitionStats::GetStartOffset() const { + return StartOffset_; +} + +ui64 TPartitionStats::GetEndOffset() const { + return EndOffset_; +} + +TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsumerResult::PartitionConsumerStats& partitionStats) + : CommittedOffset_(partitionStats.committed_offset()) +{} + +ui64 TPartitionConsumerStats::GetCommittedOffset() const { + return CommittedOffset_; +} + + + TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo) : PartitionId_(partitionInfo.partition_id()) , Active_(partitionInfo.active()) + , PartitionStats_() +{ + for (const auto& partId : partitionInfo.child_partition_ids()) { + ChildPartitionIds_.push_back(partId); + } + + for (const auto& partId : partitionInfo.parent_partition_ids()) { + ParentPartitionIds_.push_back(partId); + } + if (partitionInfo.has_partition_stats()) { + PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()}; + } +} + +TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo) + : PartitionId_(partitionInfo.partition_id()) + , Active_(partitionInfo.active()) + , PartitionStats_() { for (const auto& partId : partitionInfo.child_partition_ids()) { ChildPartitionIds_.push_back(partId); @@ -176,6 +245,18 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI for (const auto& partId : partitionInfo.parent_partition_ids()) { ParentPartitionIds_.push_back(partId); } + if (partitionInfo.has_partition_stats()) { + PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()}; + PartitionConsumerStats_ = TPartitionConsumerStats{partitionInfo.partition_consumer_stats()}; + } +} + +const TMaybe<TPartitionStats>& TPartitionInfo::GetPartitionStats() const { + return PartitionStats_; +} + +const TMaybe<TPartitionConsumerStats>& TPartitionInfo::GetPartitionConsumerStats() const { + return PartitionConsumerStats_; } bool TPartitionInfo::GetActive() const { @@ -204,6 +285,10 @@ TAsyncDescribeTopicResult TTopicClient::DescribeTopic(const TString& path, const return Impl_->DescribeTopic(path, settings); } +TAsyncDescribeConsumerResult TTopicClient::DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& settings) { + return Impl_->DescribeConsumer(path, consumer, settings); +} + IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() { static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy(); return policy; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h index de512c6ebb..563205f60a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h @@ -171,6 +171,10 @@ public: auto request = MakeOperationRequest<Ydb::Topic::DescribeTopicRequest>(settings); request.set_path(path); + if (settings.IncludeStats_) { + request.set_include_stats(true); + } + auto promise = NThreading::NewPromise<TDescribeTopicResult>(); auto extractor = [promise] @@ -195,6 +199,40 @@ public: return promise.GetFuture(); } + TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& settings) { + auto request = MakeOperationRequest<Ydb::Topic::DescribeConsumerRequest>(settings); + request.set_path(path); + request.set_consumer(consumer); + + if (settings.IncludeStats_) { + request.set_include_stats(true); + } + + auto promise = NThreading::NewPromise<TDescribeConsumerResult>(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::Topic::DescribeConsumerResult result; + if (any) { + any->UnpackTo(&result); + } + + TDescribeConsumerResult val(TStatus(std::move(status)), std::move(result)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse>( + std::move(request), + extractor, + &Ydb::Topic::V1::TopicService::Stub::AsyncDescribeConsumer, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return promise.GetFuture(); + } + // Runtime API. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp index b0e7353d25..9ae37fc3d0 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp @@ -5,6 +5,10 @@ namespace NYdb { return topicDescription.GetProto(); } + const Ydb::Topic::DescribeConsumerResult& TProtoAccessor::GetProto(const NTopic::TConsumerDescription& consumerDescription) { + return consumerDescription.GetProto(); + } + Ydb::Topic::MeteringMode TProtoAccessor::GetProto(NTopic::EMeteringMode mode) { switch (mode) { case NTopic::EMeteringMode::Unspecified: diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index b5470532f2..fce6adf49a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -62,19 +62,46 @@ private: TVector<ECodec> SupportedCodecs_; }; +class TPartitionStats { +public: + TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats); + + ui64 GetStartOffset() const; + ui64 GetEndOffset() const; +private: + ui64 StartOffset_; + ui64 EndOffset_; +}; + +class TPartitionConsumerStats { +public: + TPartitionConsumerStats(const Ydb::Topic::DescribeConsumerResult::PartitionConsumerStats& partitionStats); + ui64 GetCommittedOffset() const; + +private: + ui64 CommittedOffset_; +}; + class TPartitionInfo { public: TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo); + TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo); + ui64 GetPartitionId() const; bool GetActive() const; const TVector<ui64> GetChildPartitionIds() const; const TVector<ui64> GetParentPartitionIds() const; + const TMaybe<TPartitionStats>& GetPartitionStats() const; + const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const; + private: ui64 PartitionId_; bool Active_; TVector<ui64> ChildPartitionIds_; TVector<ui64> ParentPartitionIds_; + TMaybe<TPartitionStats> PartitionStats_; + TMaybe<TPartitionConsumerStats> PartitionConsumerStats_; }; @@ -149,6 +176,27 @@ private: }; +class TConsumerDescription { + friend class NYdb::TProtoAccessor; + +public: + TConsumerDescription(Ydb::Topic::DescribeConsumerResult&& desc); + + const TVector<TPartitionInfo>& GetPartitions() const; + + const TConsumer& GetConsumer() const; + +private: + + const Ydb::Topic::DescribeConsumerResult& GetProto() const; + + + const Ydb::Topic::DescribeConsumerResult Proto_; + TVector<TPartitionInfo> Partitions_; + TConsumer Consumer_; +}; + + // Result for describe resource request. struct TDescribeTopicResult : public TStatus { friend class NYdb::TProtoAccessor; @@ -162,7 +210,22 @@ private: TTopicDescription TopicDescription_; }; +// Result for describe resource request. +struct TDescribeConsumerResult : public TStatus { + friend class NYdb::TProtoAccessor; + + + TDescribeConsumerResult(TStatus&& status, Ydb::Topic::DescribeConsumerResult&& result); + + const TConsumerDescription& GetConsumerDescription() const; + +private: + TConsumerDescription ConsumerDescription_; +}; + + using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>; +using TAsyncDescribeConsumerResult = NThreading::TFuture<TDescribeConsumerResult>; template <class TSettings> class TAlterAttributesBuilderImpl { @@ -418,7 +481,14 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> {}; // Settings for describe resource request. -struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {}; +struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> { + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); +}; + +// Settings for describe resource request. +struct TDescribeConsumerSettings : public TOperationRequestSettings<TDescribeConsumerSettings> { + FLUENT_SETTING_DEFAULT(bool, IncludeStats, false); +}; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -1465,6 +1535,9 @@ public: // Describe settings of topic. TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& = {}); + // Describe settings of topic's consumer. + TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& = {}); + //! Create read session. std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index ce18df18f4..77c91b8b98 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4166,8 +4166,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { res.Wait(); Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; UNIT_ASSERT(res.GetValue().IsSuccess()); + auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription()); Cerr << res2 << "\n"; + UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res2.DebugString()); { NYdb::NTopic::TCreateTopicSettings settings; @@ -4287,6 +4289,24 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { } { + NYdb::TDriverConfig driverCfg; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort); + std::shared_ptr<NYdb::TDriver> ydbDriver(new NYdb::TDriver(driverCfg)); + auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver); + + auto res = topicClient.DescribeTopic("/Root/PQ/" + topic4, NYdb::NTopic::TDescribeTopicSettings{}.IncludeStats(true)); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); + + auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription()); + Cerr << res2 << "\n"; + UNIT_ASSERT(res.GetValue().GetTopicDescription().GetPartitions().size() == 3); + UNIT_ASSERT(res.GetValue().GetTopicDescription().GetPartitions()[0].GetPartitionStats()); + UNIT_ASSERT(res.GetValue().GetTopicDescription().GetPartitions()[0].GetPartitionStats()->GetEndOffset() > 0); + } + + { Ydb::Topic::DescribeTopicRequest request; Ydb::Topic::DescribeTopicResponse response; request.set_path(TStringBuilder() << "/Root/PQ/" << topic4); @@ -4406,9 +4426,24 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR); } + { + NYdb::TDriverConfig driverCfg; + driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort); + std::shared_ptr<NYdb::TDriver> ydbDriver(new NYdb::TDriver(driverCfg)); + auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver); + auto res = topicClient.DescribeConsumer("/Root/PQ/" + topic4, "user", NYdb::NTopic::TDescribeConsumerSettings{}.IncludeStats(true)); + res.Wait(); + Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n"; + UNIT_ASSERT(res.GetValue().IsSuccess()); - + auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetConsumerDescription()); + Cerr << res2 << "\n"; + UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions().size() == 3); + UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions()[0].GetPartitionStats()); + UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions()[0].GetPartitionStats()->GetEndOffset() > 0); + UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions()[0].GetPartitionConsumerStats()); + } } Y_UNIT_TEST(SchemeOperationFirstClassCitizen) { |