aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-12-02 15:47:41 +0300
committeralexnick <alexnick@ydb.tech>2022-12-02 15:47:41 +0300
commit37960f794987f2412b440c0f8cd8ccb9c8fa7de4 (patch)
tree1d9615d352dd09458dc1485d949b4638c5118b3e
parentce0f540c0f9598245163d4826c6a2560cc17142e (diff)
downloadydb-37960f794987f2412b440c0f8cd8ccb9c8fa7de4.tar.gz
support of stat in cpp sdk
progress progress progress progress provide codec in get records
-rw-r--r--ydb/public/sdk/cpp/client/ydb_proto/accessor.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp87
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h38
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h75
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp37
6 files changed, 239 insertions, 3 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
index 756c6f792c..5ffcf39b29 100644
--- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
+++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h
@@ -40,6 +40,7 @@ public:
static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats);
static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription);
static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription);
+ static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription);
static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult);
static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
index f6abfca822..eddcade51c 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
@@ -27,6 +27,16 @@ const TTopicDescription& TDescribeTopicResult::GetTopicDescription() const {
return TopicDescription_;
}
+TDescribeConsumerResult::TDescribeConsumerResult(TStatus&& status, Ydb::Topic::DescribeConsumerResult&& result)
+ : TStatus(std::move(status))
+ , ConsumerDescription_(std::move(result))
+{
+}
+
+const TConsumerDescription& TDescribeConsumerResult::GetConsumerDescription() const {
+ return ConsumerDescription_;
+}
+
TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
: Proto_(std::move(result))
, PartitioningSettings_(Proto_.partitioning_settings())
@@ -54,6 +64,16 @@ TTopicDescription::TTopicDescription(Ydb::Topic::DescribeTopicResult&& result)
}
}
+TConsumerDescription::TConsumerDescription(Ydb::Topic::DescribeConsumerResult&& result)
+ : Proto_(std::move(result))
+ , Consumer_(result.consumer())
+{
+ for (const auto& part : Proto_.partitions()) {
+ Partitions_.emplace_back(part);
+ }
+}
+
+
TConsumer::TConsumer(const Ydb::Topic::Consumer& consumer)
: ConsumerName_(consumer.name())
, Important_(consumer.important())
@@ -95,7 +115,11 @@ ui32 TTopicDescription::GetTotalPartitionsCount() const {
return Partitions_.size();
}
-const TVector<TPartitionInfo>& TTopicDescription::GetPartitions() const {
+const TVector<TPartitionInfo>& TTopicDescription::GetPartitions() const {
+ return Partitions_;
+}
+
+const TVector<TPartitionInfo>& TConsumerDescription::GetPartitions() const {
return Partitions_;
}
@@ -140,6 +164,10 @@ const Ydb::Topic::DescribeTopicResult& TTopicDescription::GetProto() const {
return Proto_;
}
+const Ydb::Topic::DescribeConsumerResult& TConsumerDescription::GetProto() const {
+ return Proto_;
+}
+
const TString& TTopicDescription::GetOwner() const {
return Owner_;
}
@@ -165,9 +193,50 @@ ui64 TPartitioningSettings::GetPartitionCountLimit() const {
return PartitionCountLimit_;
}
+TPartitionStats::TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats)
+ : StartOffset_(partitionStats.partition_offsets().start())
+ , EndOffset_(partitionStats.partition_offsets().end())
+{}
+
+ui64 TPartitionStats::GetStartOffset() const {
+ return StartOffset_;
+}
+
+ui64 TPartitionStats::GetEndOffset() const {
+ return EndOffset_;
+}
+
+TPartitionConsumerStats::TPartitionConsumerStats(const Ydb::Topic::DescribeConsumerResult::PartitionConsumerStats& partitionStats)
+ : CommittedOffset_(partitionStats.committed_offset())
+{}
+
+ui64 TPartitionConsumerStats::GetCommittedOffset() const {
+ return CommittedOffset_;
+}
+
+
+
TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo)
: PartitionId_(partitionInfo.partition_id())
, Active_(partitionInfo.active())
+ , PartitionStats_()
+{
+ for (const auto& partId : partitionInfo.child_partition_ids()) {
+ ChildPartitionIds_.push_back(partId);
+ }
+
+ for (const auto& partId : partitionInfo.parent_partition_ids()) {
+ ParentPartitionIds_.push_back(partId);
+ }
+ if (partitionInfo.has_partition_stats()) {
+ PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
+ }
+}
+
+TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo)
+ : PartitionId_(partitionInfo.partition_id())
+ , Active_(partitionInfo.active())
+ , PartitionStats_()
{
for (const auto& partId : partitionInfo.child_partition_ids()) {
ChildPartitionIds_.push_back(partId);
@@ -176,6 +245,18 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI
for (const auto& partId : partitionInfo.parent_partition_ids()) {
ParentPartitionIds_.push_back(partId);
}
+ if (partitionInfo.has_partition_stats()) {
+ PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
+ PartitionConsumerStats_ = TPartitionConsumerStats{partitionInfo.partition_consumer_stats()};
+ }
+}
+
+const TMaybe<TPartitionStats>& TPartitionInfo::GetPartitionStats() const {
+ return PartitionStats_;
+}
+
+const TMaybe<TPartitionConsumerStats>& TPartitionInfo::GetPartitionConsumerStats() const {
+ return PartitionConsumerStats_;
}
bool TPartitionInfo::GetActive() const {
@@ -204,6 +285,10 @@ TAsyncDescribeTopicResult TTopicClient::DescribeTopic(const TString& path, const
return Impl_->DescribeTopic(path, settings);
}
+TAsyncDescribeConsumerResult TTopicClient::DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& settings) {
+ return Impl_->DescribeConsumer(path, consumer, settings);
+}
+
IRetryPolicy::TPtr IRetryPolicy::GetDefaultPolicy() {
static IRetryPolicy::TPtr policy = GetExponentialBackoffPolicy();
return policy;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
index de512c6ebb..563205f60a 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h
@@ -171,6 +171,10 @@ public:
auto request = MakeOperationRequest<Ydb::Topic::DescribeTopicRequest>(settings);
request.set_path(path);
+ if (settings.IncludeStats_) {
+ request.set_include_stats(true);
+ }
+
auto promise = NThreading::NewPromise<TDescribeTopicResult>();
auto extractor = [promise]
@@ -195,6 +199,40 @@ public:
return promise.GetFuture();
}
+ TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& settings) {
+ auto request = MakeOperationRequest<Ydb::Topic::DescribeConsumerRequest>(settings);
+ request.set_path(path);
+ request.set_consumer(consumer);
+
+ if (settings.IncludeStats_) {
+ request.set_include_stats(true);
+ }
+
+ auto promise = NThreading::NewPromise<TDescribeConsumerResult>();
+
+ auto extractor = [promise]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ Ydb::Topic::DescribeConsumerResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+
+ TDescribeConsumerResult val(TStatus(std::move(status)), std::move(result));
+ promise.SetValue(std::move(val));
+ };
+
+ Connections_->RunDeferred<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse>(
+ std::move(request),
+ extractor,
+ &Ydb::Topic::V1::TopicService::Stub::AsyncDescribeConsumer,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return promise.GetFuture();
+ }
+
// Runtime API.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleWriteSession(const TWriteSessionSettings& settings);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
index b0e7353d25..9ae37fc3d0 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/proto_accessor.cpp
@@ -5,6 +5,10 @@ namespace NYdb {
return topicDescription.GetProto();
}
+ const Ydb::Topic::DescribeConsumerResult& TProtoAccessor::GetProto(const NTopic::TConsumerDescription& consumerDescription) {
+ return consumerDescription.GetProto();
+ }
+
Ydb::Topic::MeteringMode TProtoAccessor::GetProto(NTopic::EMeteringMode mode) {
switch (mode) {
case NTopic::EMeteringMode::Unspecified:
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index b5470532f2..fce6adf49a 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -62,19 +62,46 @@ private:
TVector<ECodec> SupportedCodecs_;
};
+class TPartitionStats {
+public:
+ TPartitionStats(const Ydb::Topic::PartitionStats& partitionStats);
+
+ ui64 GetStartOffset() const;
+ ui64 GetEndOffset() const;
+private:
+ ui64 StartOffset_;
+ ui64 EndOffset_;
+};
+
+class TPartitionConsumerStats {
+public:
+ TPartitionConsumerStats(const Ydb::Topic::DescribeConsumerResult::PartitionConsumerStats& partitionStats);
+ ui64 GetCommittedOffset() const;
+
+private:
+ ui64 CommittedOffset_;
+};
+
class TPartitionInfo {
public:
TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionInfo& partitionInfo);
+ TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo);
+
ui64 GetPartitionId() const;
bool GetActive() const;
const TVector<ui64> GetChildPartitionIds() const;
const TVector<ui64> GetParentPartitionIds() const;
+ const TMaybe<TPartitionStats>& GetPartitionStats() const;
+ const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const;
+
private:
ui64 PartitionId_;
bool Active_;
TVector<ui64> ChildPartitionIds_;
TVector<ui64> ParentPartitionIds_;
+ TMaybe<TPartitionStats> PartitionStats_;
+ TMaybe<TPartitionConsumerStats> PartitionConsumerStats_;
};
@@ -149,6 +176,27 @@ private:
};
+class TConsumerDescription {
+ friend class NYdb::TProtoAccessor;
+
+public:
+ TConsumerDescription(Ydb::Topic::DescribeConsumerResult&& desc);
+
+ const TVector<TPartitionInfo>& GetPartitions() const;
+
+ const TConsumer& GetConsumer() const;
+
+private:
+
+ const Ydb::Topic::DescribeConsumerResult& GetProto() const;
+
+
+ const Ydb::Topic::DescribeConsumerResult Proto_;
+ TVector<TPartitionInfo> Partitions_;
+ TConsumer Consumer_;
+};
+
+
// Result for describe resource request.
struct TDescribeTopicResult : public TStatus {
friend class NYdb::TProtoAccessor;
@@ -162,7 +210,22 @@ private:
TTopicDescription TopicDescription_;
};
+// Result for describe resource request.
+struct TDescribeConsumerResult : public TStatus {
+ friend class NYdb::TProtoAccessor;
+
+
+ TDescribeConsumerResult(TStatus&& status, Ydb::Topic::DescribeConsumerResult&& result);
+
+ const TConsumerDescription& GetConsumerDescription() const;
+
+private:
+ TConsumerDescription ConsumerDescription_;
+};
+
+
using TAsyncDescribeTopicResult = NThreading::TFuture<TDescribeTopicResult>;
+using TAsyncDescribeConsumerResult = NThreading::TFuture<TDescribeConsumerResult>;
template <class TSettings>
class TAlterAttributesBuilderImpl {
@@ -418,7 +481,14 @@ struct TAlterTopicSettings : public TOperationRequestSettings<TAlterTopicSetting
struct TDropTopicSettings : public TOperationRequestSettings<TDropTopicSettings> {};
// Settings for describe resource request.
-struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {};
+struct TDescribeTopicSettings : public TOperationRequestSettings<TDescribeTopicSettings> {
+ FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
+};
+
+// Settings for describe resource request.
+struct TDescribeConsumerSettings : public TOperationRequestSettings<TDescribeConsumerSettings> {
+ FLUENT_SETTING_DEFAULT(bool, IncludeStats, false);
+};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -1465,6 +1535,9 @@ public:
// Describe settings of topic.
TAsyncDescribeTopicResult DescribeTopic(const TString& path, const TDescribeTopicSettings& = {});
+ // Describe settings of topic's consumer.
+ TAsyncDescribeConsumerResult DescribeConsumer(const TString& path, const TString& consumer, const TDescribeConsumerSettings& = {});
+
//! Create read session.
std::shared_ptr<IReadSession> CreateReadSession(const TReadSessionSettings& settings);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index ce18df18f4..77c91b8b98 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4166,8 +4166,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
res.Wait();
Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
UNIT_ASSERT(res.GetValue().IsSuccess());
+
auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription());
Cerr << res2 << "\n";
+
UNIT_ASSERT_VALUES_EQUAL(descrRes.DebugString(), res2.DebugString());
{
NYdb::NTopic::TCreateTopicSettings settings;
@@ -4287,6 +4289,24 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
{
+ NYdb::TDriverConfig driverCfg;
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort);
+ std::shared_ptr<NYdb::TDriver> ydbDriver(new NYdb::TDriver(driverCfg));
+ auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver);
+
+ auto res = topicClient.DescribeTopic("/Root/PQ/" + topic4, NYdb::NTopic::TDescribeTopicSettings{}.IncludeStats(true));
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
+
+ auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetTopicDescription());
+ Cerr << res2 << "\n";
+ UNIT_ASSERT(res.GetValue().GetTopicDescription().GetPartitions().size() == 3);
+ UNIT_ASSERT(res.GetValue().GetTopicDescription().GetPartitions()[0].GetPartitionStats());
+ UNIT_ASSERT(res.GetValue().GetTopicDescription().GetPartitions()[0].GetPartitionStats()->GetEndOffset() > 0);
+ }
+
+ {
Ydb::Topic::DescribeTopicRequest request;
Ydb::Topic::DescribeTopicResponse response;
request.set_path(TStringBuilder() << "/Root/PQ/" << topic4);
@@ -4406,9 +4426,24 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR);
}
+ {
+ NYdb::TDriverConfig driverCfg;
+ driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << server.GrpcPort);
+ std::shared_ptr<NYdb::TDriver> ydbDriver(new NYdb::TDriver(driverCfg));
+ auto topicClient = NYdb::NTopic::TTopicClient(*ydbDriver);
+ auto res = topicClient.DescribeConsumer("/Root/PQ/" + topic4, "user", NYdb::NTopic::TDescribeConsumerSettings{}.IncludeStats(true));
+ res.Wait();
+ Cerr << res.GetValue().IsSuccess() << " " << res.GetValue().GetIssues().ToString() << "\n";
+ UNIT_ASSERT(res.GetValue().IsSuccess());
-
+ auto res2 = NYdb::TProtoAccessor::GetProto(res.GetValue().GetConsumerDescription());
+ Cerr << res2 << "\n";
+ UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions().size() == 3);
+ UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions()[0].GetPartitionStats());
+ UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions()[0].GetPartitionStats()->GetEndOffset() > 0);
+ UNIT_ASSERT(res.GetValue().GetConsumerDescription().GetPartitions()[0].GetPartitionConsumerStats());
+ }
}
Y_UNIT_TEST(SchemeOperationFirstClassCitizen) {