summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <[email protected]>2024-08-29 14:46:22 +0500
committerGitHub <[email protected]>2024-08-29 14:46:22 +0500
commit6d41d83f92ba5ba9a045f2edd2cf41aae7187272 (patch)
tree33083c81cad5397b16cffc4b101ea8e310e97ef4
parent10c360e34bed31d55401eae3366f9d6a9fd2ef81 (diff)
Delete deprecated code (ReadRules,..) (#8405)
-rw-r--r--ydb/core/persqueue/pq_impl.cpp4
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp3
-rw-r--r--ydb/core/persqueue/utils.cpp8
-rw-r--r--ydb/core/persqueue/utils.h3
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp95
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp24
-rw-r--r--ydb/services/persqueue_v1/topic_yql_ut.cpp85
7 files changed, 66 insertions, 156 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 7d37bcb0fa9..a7108b84c30 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1746,9 +1746,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
generation = curConfigVersion;
}
c.SetGeneration(generation);
- if (ReadRuleCompatible()) {
- cfg.AddReadRuleGenerations(generation);
- }
+ cfg.AddReadRuleGenerations(generation);
}
}
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index b0f3c7339d1..0d999407d39 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -176,7 +176,8 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset,
}
}
}
- UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset);
+ UNIT_ASSERT_C((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset,
+ "expectedOffset=" << expectedOffset << " resp.HasOffset()=" << resp.HasOffset() << " resp.GetOffset()=" << resp.GetOffset());
if (writeTime > 0) {
UNIT_ASSERT(resp.HasWriteTimestampEstimateMS());
UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime);
diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp
index 1003a8003a0..2c1ad5c5f42 100644
--- a/ydb/core/persqueue/utils.cpp
+++ b/ydb/core/persqueue/utils.cpp
@@ -94,6 +94,14 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) {
}
consumer->SetImportant(IsImportantClient(config, consumer->GetName()));
}
+
+ config.ClearReadRules();
+ config.ClearReadFromTimestampsMs();
+ config.ClearConsumerFormatVersions();
+ config.ClearConsumerCodecs();
+ config.ClearReadRuleServiceTypes();
+ config.ClearReadRuleVersions();
+ config.ClearReadRuleGenerations();
}
if (!config.PartitionsSize()) {
diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h
index 7c42e70ff59..64b4a8cca38 100644
--- a/ydb/core/persqueue/utils.h
+++ b/ydb/core/persqueue/utils.h
@@ -21,9 +21,6 @@ TString SourceIdHash(const TString& sourceId);
void Migrate(NKikimrPQ::TPQTabletConfig& config);
-// This function required for marking the code which required remove after 25-1
-constexpr bool ReadRuleCompatible() { return true; }
-
bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName);
size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config);
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index 9eadb2ab7e8..35a5dda561b 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -99,9 +99,6 @@ namespace NKikimr::NGRpcProxy::V1 {
auto* consumer = config->AddConsumers();
consumer->SetName(consumerName);
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRules(consumerName);
- }
if (rr.starting_message_timestamp_ms() < 0) {
return TMsgPqCodes(
@@ -110,9 +107,6 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetReadFromTimestampsMs(rr.starting_message_timestamp_ms());
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms());
- }
if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) {
return TMsgPqCodes(
@@ -121,9 +115,6 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetFormatVersion(rr.supported_format() - 1);
- if (NPQ::ReadRuleCompatible()) {
- config->AddConsumerFormatVersions(rr.supported_format() - 1);
- }
if (rr.version() < 0) {
return TMsgPqCodes(
@@ -132,12 +123,8 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetVersion(rr.version());
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRuleVersions(rr.version());
- }
auto* cct = consumer->MutableCodec();
- auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr;
if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) {
return TMsgPqCodes(
TStringBuilder() << "supported_codecs count cannot be more than "
@@ -156,17 +143,10 @@ namespace NKikimr::NGRpcProxy::V1 {
cct->AddIds(codec - 1);
cct->AddCodecs(codecName);
-
- if (NPQ::ReadRuleCompatible()) {
- ct->CopyFrom(*cct);
- }
}
if (rr.important()) {
consumer->SetImportant(true);
- if (NPQ::ReadRuleCompatible()) {
- config->MutablePartitionConfig()->AddImportantClientId(consumerName);
- }
}
if (!rr.service_type().empty()) {
@@ -178,9 +158,6 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetServiceType(rr.service_type());
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRuleServiceTypes(rr.service_type());
- }
} else {
if (pqConfig.GetDisallowDefaultClientServiceType()) {
return TMsgPqCodes(
@@ -190,9 +167,6 @@ namespace NKikimr::NGRpcProxy::V1 {
}
const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName();
consumer->SetServiceType(defaultCientServiceType);
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRuleServiceTypes(defaultCientServiceType);
- }
}
return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
}
@@ -238,9 +212,6 @@ namespace NKikimr::NGRpcProxy::V1 {
auto* consumer = config->AddConsumers();
consumer->SetName(consumerName);
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRules(consumerName);
- }
if (rr.read_from().seconds() < 0) {
return TMsgPqCodes(
@@ -249,19 +220,10 @@ namespace NKikimr::NGRpcProxy::V1 {
);
}
consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000);
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000);
- }
-
consumer->SetFormatVersion(0);
- if (NPQ::ReadRuleCompatible()) {
- config->AddConsumerFormatVersions(0);
- }
-
- TString serviceType;
const auto& defaultClientServiceType = pqConfig.GetDefaultClientServiceType().GetName();
- serviceType = defaultClientServiceType;
+ TString serviceType = defaultClientServiceType;
TString passwordHash = "";
bool hasPassword = false;
@@ -318,17 +280,9 @@ namespace NKikimr::NGRpcProxy::V1 {
}
consumer->SetServiceType(serviceType);
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRuleServiceTypes(serviceType);
- }
-
consumer->SetVersion(version);
- if (NPQ::ReadRuleCompatible()) {
- config->AddReadRuleVersions(version);
- }
auto* cct = consumer->MutableCodec();
- auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr;
for(const auto& codec : rr.supported_codecs().codecs()) {
if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) {
@@ -339,10 +293,6 @@ namespace NKikimr::NGRpcProxy::V1 {
}
cct->AddIds(codec - 1);
cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM");
-
- if (NPQ::ReadRuleCompatible()) {
- ct->CopyFrom(*cct);
- }
}
if (rr.important()) {
@@ -350,9 +300,6 @@ namespace NKikimr::NGRpcProxy::V1 {
return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT);
}
consumer->SetImportant(true);
- if (NPQ::ReadRuleCompatible()) {
- config->MutablePartitionConfig()->AddImportantClientId(consumerName);
- }
}
return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK);
@@ -363,7 +310,7 @@ namespace NKikimr::NGRpcProxy::V1 {
NKikimrPQ::TPQTabletConfig* config,
const NKikimrPQ::TPQTabletConfig& originalConfig,
const TString& consumerName,
- const NKikimrPQ::TPQConfig& pqConfig
+ const NKikimrPQ::TPQConfig& /*pqConfig*/
) {
config->ClearReadRuleVersions();
config->ClearReadRules();
@@ -374,46 +321,8 @@ namespace NKikimr::NGRpcProxy::V1 {
config->ClearReadRuleServiceTypes();
config->ClearConsumers();
- if (NPQ::ReadRuleCompatible()) {
- for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) {
- if (importantConsumer != consumerName) {
- config->MutablePartitionConfig()->AddImportantClientId(importantConsumer);
- }
- }
- }
-
bool removed = false;
- if (NPQ::ReadRuleCompatible()) {
- for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) {
- auto& readRule = originalConfig.GetReadRules(i);
-
- if (readRule == consumerName) {
- removed = true;
- continue;
- }
-
- config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i));
- config->AddReadRules(readRule);
- config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i));
- config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i));
- auto* ct = config->AddConsumerCodecs();
- for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) {
- ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j));
- ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j));
- }
- if (i < originalConfig.ReadRuleServiceTypesSize()) {
- config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i));
- } else {
- if (pqConfig.GetDisallowDefaultClientServiceType()) {
- return TStringBuilder() << "service type cannot be empty for consumer '"
- << readRule << "'";
- }
- config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName());
- }
- }
- }
-
for (auto& consumer : originalConfig.GetConsumers()) {
if (consumerName == consumer.GetName()) {
removed = true;
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index c47927afa0d..4966ba9330a 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -5028,7 +5028,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
MaxCountInPartition: 2147483647
MaxSizeInPartition: 234
LifetimeSeconds: 172800
- ImportantClientId: "consumer"
SourceIdLifetimeSeconds: 1382400
WriteSpeedInBytesPerSecond: 123
BurstSize: 1000
@@ -5078,22 +5077,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
Ident: "acc"
Topic: "topic3"
DC: "dc1"
- ReadRules: "first-consumer"
- ReadRules: "consumer"
- ReadFromTimestampsMs: 11223344000
- ReadFromTimestampsMs: 111000
- ConsumerFormatVersions: 0
- ConsumerFormatVersions: 0
- ConsumerCodecs {
- }
- ConsumerCodecs {
- Ids: 2
- Ids: 10004
- Codecs: "lzop"
- Codecs: "CUSTOM"
- }
- ReadRuleServiceTypes: "data-streams"
- ReadRuleServiceTypes: "data-streams"
FormatVersion: 0
Codecs {
Ids: 2
@@ -5101,8 +5084,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
Codecs: "lzop"
Codecs: "CUSTOM"
}
- ReadRuleVersions: 0
- ReadRuleVersions: 567
TopicPath: "/Root/PQ/rt3.dc1--acc--topic3"
YdbDatabasePath: "/Root"
Consumers {
@@ -5113,7 +5094,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
}
ServiceType: "data-streams"
Version: 0
- Important: false
}
Consumers {
Name: "consumer"
@@ -6101,6 +6081,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
auto checkDescribe = [&](const TString& topic, const TVector<std::pair<TString, TString>>& readRules) {
+ Cerr << ">>>>> Check topic: " << topic << Endl;
DescribeTopicRequest request;
DescribeTopicResponse response;
request.set_path(topic);
@@ -6110,7 +6091,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
UNIT_ASSERT(status.ok());
DescribeTopicResult res;
response.operation().result().UnpackTo(&res);
- Cerr << response << "\n" << res << "\n";
+ Cerr << ">>>>> Response: " << response << Endl;
+ Cerr << ">>>>> Result:" << res << "\n";
UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(res.settings().read_rules().size(), readRules.size());
diff --git a/ydb/services/persqueue_v1/topic_yql_ut.cpp b/ydb/services/persqueue_v1/topic_yql_ut.cpp
index 8f698f703dc..8068d52ac92 100644
--- a/ydb/services/persqueue_v1/topic_yql_ut.cpp
+++ b/ydb/services/persqueue_v1/topic_yql_ut.cpp
@@ -84,40 +84,45 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
auto after = CheckPQChildrenSize("after create");
UNIT_ASSERT_VALUES_EQUAL(after, before + 1);
auto pqGroup = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription()
- .GetPersQueueGroup();
- const auto& describeAfterCreate = pqGroup.GetPQTabletConfig();
- Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString();
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetLifetimeSeconds(), 3600);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetBurstSize(), 100500);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9000);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().ImportantClientIdSize(), 1);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetImportantClientId(0), "c2");
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetMinPartitionCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetMaxPartitionCount(), 5);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleThresholdSeconds(), 60);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 50);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 40);
- UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describeAfterCreate.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT));
+ .GetPersQueueGroup();
+
+ {
+ const auto& describe = pqGroup.GetPQTabletConfig();
+ Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetLifetimeSeconds(), 3600);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100500);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9000);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMinPartitionCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMaxPartitionCount(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleThresholdSeconds(), 60);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 50);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 40);
+ UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describe.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT));
+ UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2);
+
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2);
- auto& codecs = describeAfterCreate.GetConsumerCodecs(1);
- UNIT_ASSERT_VALUES_EQUAL(codecs.IdsSize(), 3);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetCodecs().IdsSize(), 2);
- UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2);
- UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetReadFromTimestampsMs(1), 100 * 1000);
+ auto& consumer0 = describe.GetConsumers()[0];
+ UNIT_ASSERT_VALUES_EQUAL(consumer0.GetName(), "c1");
+ UNIT_ASSERT_VALUES_EQUAL(consumer0.GetImportant(), false);
- auto expectedDescr = describeAfterCreate;
+ auto& consumer1 = describe.GetConsumers()[1];
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2");
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), true);
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 100 * 1000);
+ }
+ auto expectedDescr = pqGroup.GetPQTabletConfig();
{
auto partCfg = expectedDescr.MutablePartitionConfig();
partCfg->SetLifetimeSeconds(7200);
partCfg->SetBurstSize(100501);
partCfg->SetWriteSpeedInBytesPerSecond(9001);
- auto* rtfs = expectedDescr.MutableReadFromTimestampsMs();
- rtfs->Set(1, 1609462861000);
expectedDescr.MutablePartitionStrategy()->SetMinPartitionCount(3);
-
expectedDescr.MutableConsumers(1)->SetReadFromTimestampsMs(1609462861000);
}
+
const char *query2 = R"__(
ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1`
ALTER CONSUMER c2 SET (read_from = Timestamp('2021-01-01T01:01:01Z')),
@@ -150,20 +155,30 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) {
server.AnnoyingClient->RunYqlSchemeQuery(query3);
pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription()
- .GetPersQueueGroup();
- const auto& descr3 = pqGroup2.GetPQTabletConfig();
- Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString();
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetLifetimeSeconds(), 7200);
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetBurstSize(), 100501);
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001);
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().ImportantClientIdSize(), 1);
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetImportantClientId(0), "c3");
+ .GetPersQueueGroup();
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetConsumerCodecs(0).IdsSize(), 2);
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetCodecs().IdsSize(), 1);
- UNIT_ASSERT_VALUES_EQUAL(descr3.GetReadFromTimestampsMs(0), 1609462861000);
- UNIT_ASSERT_VALUES_EQUAL(pqGroup2.GetTotalGroupCount(), 2);
+ {
+ const auto& describe = pqGroup2.GetPQTabletConfig();
+ Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup2.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL(pqGroup2.GetTotalGroupCount(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetLifetimeSeconds(), 7200);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100501);
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001);
+
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2);
+ auto& consumer1 = describe.GetConsumers(0);
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2");
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), false);
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 1609462861000);
+ UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 2);
+
+ auto& consumer2 = describe.GetConsumers(1);
+ UNIT_ASSERT_VALUES_EQUAL(consumer2.GetName(), "c3");
+ UNIT_ASSERT_VALUES_EQUAL(consumer2.GetImportant(), true);
+ UNIT_ASSERT_VALUES_EQUAL(consumer2.GetCodec().IdsSize(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(consumer2.GetReadFromTimestampsMs(), 0);
+ }
//1609462861
}