diff options
| author | Nikolay Shestakov <[email protected]> | 2024-08-29 14:46:22 +0500 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-08-29 14:46:22 +0500 |
| commit | 6d41d83f92ba5ba9a045f2edd2cf41aae7187272 (patch) | |
| tree | 33083c81cad5397b16cffc4b101ea8e310e97ef4 | |
| parent | 10c360e34bed31d55401eae3366f9d6a9fd2ef81 (diff) | |
Delete deprecated code (ReadRules,..) (#8405)
| -rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 3 | ||||
| -rw-r--r-- | ydb/core/persqueue/utils.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/persqueue/utils.h | 3 | ||||
| -rw-r--r-- | ydb/services/lib/actors/pq_schema_actor.cpp | 95 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 24 | ||||
| -rw-r--r-- | ydb/services/persqueue_v1/topic_yql_ut.cpp | 85 |
7 files changed, 66 insertions, 156 deletions
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 7d37bcb0fa9..a7108b84c30 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1746,9 +1746,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf generation = curConfigVersion; } c.SetGeneration(generation); - if (ReadRuleCompatible()) { - cfg.AddReadRuleGenerations(generation); - } + cfg.AddReadRuleGenerations(generation); } } diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index b0f3c7339d1..0d999407d39 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -176,7 +176,8 @@ void CmdGetOffset(const ui32 partition, const TString& user, i64 expectedOffset, } } } - UNIT_ASSERT((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset); + UNIT_ASSERT_C((expectedOffset == -1 && !resp.HasOffset()) || (i64)resp.GetOffset() == expectedOffset, + "expectedOffset=" << expectedOffset << " resp.HasOffset()=" << resp.HasOffset() << " resp.GetOffset()=" << resp.GetOffset()); if (writeTime > 0) { UNIT_ASSERT(resp.HasWriteTimestampEstimateMS()); UNIT_ASSERT(resp.GetWriteTimestampEstimateMS() >= writeTime); diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index 1003a8003a0..2c1ad5c5f42 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -94,6 +94,14 @@ void Migrate(NKikimrPQ::TPQTabletConfig& config) { } consumer->SetImportant(IsImportantClient(config, consumer->GetName())); } + + config.ClearReadRules(); + config.ClearReadFromTimestampsMs(); + config.ClearConsumerFormatVersions(); + config.ClearConsumerCodecs(); + config.ClearReadRuleServiceTypes(); + config.ClearReadRuleVersions(); + config.ClearReadRuleGenerations(); } if (!config.PartitionsSize()) { diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index 7c42e70ff59..64b4a8cca38 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -21,9 +21,6 @@ TString SourceIdHash(const TString& sourceId); void Migrate(NKikimrPQ::TPQTabletConfig& config); -// This function required for marking the code which required remove after 25-1 -constexpr bool ReadRuleCompatible() { return true; } - bool HasConsumer(const NKikimrPQ::TPQTabletConfig& config, const TString& consumerName); size_t ConsumerCount(const NKikimrPQ::TPQTabletConfig& config); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 9eadb2ab7e8..35a5dda561b 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -99,9 +99,6 @@ namespace NKikimr::NGRpcProxy::V1 { auto* consumer = config->AddConsumers(); consumer->SetName(consumerName); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRules(consumerName); - } if (rr.starting_message_timestamp_ms() < 0) { return TMsgPqCodes( @@ -110,9 +107,6 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetReadFromTimestampsMs(rr.starting_message_timestamp_ms()); - if (NPQ::ReadRuleCompatible()) { - config->AddReadFromTimestampsMs(rr.starting_message_timestamp_ms()); - } if (!Ydb::PersQueue::V1::TopicSettings::Format_IsValid((int)rr.supported_format()) || rr.supported_format() == 0) { return TMsgPqCodes( @@ -121,9 +115,6 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetFormatVersion(rr.supported_format() - 1); - if (NPQ::ReadRuleCompatible()) { - config->AddConsumerFormatVersions(rr.supported_format() - 1); - } if (rr.version() < 0) { return TMsgPqCodes( @@ -132,12 +123,8 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetVersion(rr.version()); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleVersions(rr.version()); - } auto* cct = consumer->MutableCodec(); - auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; if (rr.supported_codecs().size() > MAX_SUPPORTED_CODECS_COUNT) { return TMsgPqCodes( TStringBuilder() << "supported_codecs count cannot be more than " @@ -156,17 +143,10 @@ namespace NKikimr::NGRpcProxy::V1 { cct->AddIds(codec - 1); cct->AddCodecs(codecName); - - if (NPQ::ReadRuleCompatible()) { - ct->CopyFrom(*cct); - } } if (rr.important()) { consumer->SetImportant(true); - if (NPQ::ReadRuleCompatible()) { - config->MutablePartitionConfig()->AddImportantClientId(consumerName); - } } if (!rr.service_type().empty()) { @@ -178,9 +158,6 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetServiceType(rr.service_type()); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleServiceTypes(rr.service_type()); - } } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { return TMsgPqCodes( @@ -190,9 +167,6 @@ namespace NKikimr::NGRpcProxy::V1 { } const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); consumer->SetServiceType(defaultCientServiceType); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleServiceTypes(defaultCientServiceType); - } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); } @@ -238,9 +212,6 @@ namespace NKikimr::NGRpcProxy::V1 { auto* consumer = config->AddConsumers(); consumer->SetName(consumerName); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRules(consumerName); - } if (rr.read_from().seconds() < 0) { return TMsgPqCodes( @@ -249,19 +220,10 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000); - if (NPQ::ReadRuleCompatible()) { - config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); - } - consumer->SetFormatVersion(0); - if (NPQ::ReadRuleCompatible()) { - config->AddConsumerFormatVersions(0); - } - - TString serviceType; const auto& defaultClientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); - serviceType = defaultClientServiceType; + TString serviceType = defaultClientServiceType; TString passwordHash = ""; bool hasPassword = false; @@ -318,17 +280,9 @@ namespace NKikimr::NGRpcProxy::V1 { } consumer->SetServiceType(serviceType); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleServiceTypes(serviceType); - } - consumer->SetVersion(version); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleVersions(version); - } auto* cct = consumer->MutableCodec(); - auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; for(const auto& codec : rr.supported_codecs().codecs()) { if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) { @@ -339,10 +293,6 @@ namespace NKikimr::NGRpcProxy::V1 { } cct->AddIds(codec - 1); cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); - - if (NPQ::ReadRuleCompatible()) { - ct->CopyFrom(*cct); - } } if (rr.important()) { @@ -350,9 +300,6 @@ namespace NKikimr::NGRpcProxy::V1 { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } consumer->SetImportant(true); - if (NPQ::ReadRuleCompatible()) { - config->MutablePartitionConfig()->AddImportantClientId(consumerName); - } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); @@ -363,7 +310,7 @@ namespace NKikimr::NGRpcProxy::V1 { NKikimrPQ::TPQTabletConfig* config, const NKikimrPQ::TPQTabletConfig& originalConfig, const TString& consumerName, - const NKikimrPQ::TPQConfig& pqConfig + const NKikimrPQ::TPQConfig& /*pqConfig*/ ) { config->ClearReadRuleVersions(); config->ClearReadRules(); @@ -374,46 +321,8 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleServiceTypes(); config->ClearConsumers(); - if (NPQ::ReadRuleCompatible()) { - for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { - if (importantConsumer != consumerName) { - config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); - } - } - } - bool removed = false; - if (NPQ::ReadRuleCompatible()) { - for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { - auto& readRule = originalConfig.GetReadRules(i); - - if (readRule == consumerName) { - removed = true; - continue; - } - - config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); - config->AddReadRules(readRule); - config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); - config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); - auto* ct = config->AddConsumerCodecs(); - for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { - ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); - ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); - } - if (i < originalConfig.ReadRuleServiceTypesSize()) { - config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i)); - } else { - if (pqConfig.GetDisallowDefaultClientServiceType()) { - return TStringBuilder() << "service type cannot be empty for consumer '" - << readRule << "'"; - } - config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); - } - } - } - for (auto& consumer : originalConfig.GetConsumers()) { if (consumerName == consumer.GetName()) { removed = true; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index c47927afa0d..4966ba9330a 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5028,7 +5028,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; MaxCountInPartition: 2147483647 MaxSizeInPartition: 234 LifetimeSeconds: 172800 - ImportantClientId: "consumer" SourceIdLifetimeSeconds: 1382400 WriteSpeedInBytesPerSecond: 123 BurstSize: 1000 @@ -5078,22 +5077,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Ident: "acc" Topic: "topic3" DC: "dc1" - ReadRules: "first-consumer" - ReadRules: "consumer" - ReadFromTimestampsMs: 11223344000 - ReadFromTimestampsMs: 111000 - ConsumerFormatVersions: 0 - ConsumerFormatVersions: 0 - ConsumerCodecs { - } - ConsumerCodecs { - Ids: 2 - Ids: 10004 - Codecs: "lzop" - Codecs: "CUSTOM" - } - ReadRuleServiceTypes: "data-streams" - ReadRuleServiceTypes: "data-streams" FormatVersion: 0 Codecs { Ids: 2 @@ -5101,8 +5084,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Codecs: "lzop" Codecs: "CUSTOM" } - ReadRuleVersions: 0 - ReadRuleVersions: 567 TopicPath: "/Root/PQ/rt3.dc1--acc--topic3" YdbDatabasePath: "/Root" Consumers { @@ -5113,7 +5094,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; } ServiceType: "data-streams" Version: 0 - Important: false } Consumers { Name: "consumer" @@ -6101,6 +6081,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; auto checkDescribe = [&](const TString& topic, const TVector<std::pair<TString, TString>>& readRules) { + Cerr << ">>>>> Check topic: " << topic << Endl; DescribeTopicRequest request; DescribeTopicResponse response; request.set_path(topic); @@ -6110,7 +6091,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; UNIT_ASSERT(status.ok()); DescribeTopicResult res; response.operation().result().UnpackTo(&res); - Cerr << response << "\n" << res << "\n"; + Cerr << ">>>>> Response: " << response << Endl; + Cerr << ">>>>> Result:" << res << "\n"; UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); UNIT_ASSERT_VALUES_EQUAL(res.settings().read_rules().size(), readRules.size()); diff --git a/ydb/services/persqueue_v1/topic_yql_ut.cpp b/ydb/services/persqueue_v1/topic_yql_ut.cpp index 8f698f703dc..8068d52ac92 100644 --- a/ydb/services/persqueue_v1/topic_yql_ut.cpp +++ b/ydb/services/persqueue_v1/topic_yql_ut.cpp @@ -84,40 +84,45 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { auto after = CheckPQChildrenSize("after create"); UNIT_ASSERT_VALUES_EQUAL(after, before + 1); auto pqGroup = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription() - .GetPersQueueGroup(); - const auto& describeAfterCreate = pqGroup.GetPQTabletConfig(); - Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString(); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetLifetimeSeconds(), 3600); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetBurstSize(), 100500); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9000); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().ImportantClientIdSize(), 1); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetImportantClientId(0), "c2"); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetMinPartitionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetMaxPartitionCount(), 5); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleThresholdSeconds(), 60); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 50); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 40); - UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describeAfterCreate.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT)); + .GetPersQueueGroup(); + + { + const auto& describe = pqGroup.GetPQTabletConfig(); + Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString(); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetLifetimeSeconds(), 3600); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100500); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9000); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMinPartitionCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMaxPartitionCount(), 5); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleThresholdSeconds(), 60); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 50); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 40); + UNIT_ASSERT_VALUES_EQUAL(static_cast<int>(describe.GetPartitionStrategy().GetPartitionStrategyType()), static_cast<int>(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT)); + UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2); + + UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2); - auto& codecs = describeAfterCreate.GetConsumerCodecs(1); - UNIT_ASSERT_VALUES_EQUAL(codecs.IdsSize(), 3); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetCodecs().IdsSize(), 2); - UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetReadFromTimestampsMs(1), 100 * 1000); + auto& consumer0 = describe.GetConsumers()[0]; + UNIT_ASSERT_VALUES_EQUAL(consumer0.GetName(), "c1"); + UNIT_ASSERT_VALUES_EQUAL(consumer0.GetImportant(), false); - auto expectedDescr = describeAfterCreate; + auto& consumer1 = describe.GetConsumers()[1]; + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2"); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), true); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 3); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 100 * 1000); + } + auto expectedDescr = pqGroup.GetPQTabletConfig(); { auto partCfg = expectedDescr.MutablePartitionConfig(); partCfg->SetLifetimeSeconds(7200); partCfg->SetBurstSize(100501); partCfg->SetWriteSpeedInBytesPerSecond(9001); - auto* rtfs = expectedDescr.MutableReadFromTimestampsMs(); - rtfs->Set(1, 1609462861000); expectedDescr.MutablePartitionStrategy()->SetMinPartitionCount(3); - expectedDescr.MutableConsumers(1)->SetReadFromTimestampsMs(1609462861000); } + const char *query2 = R"__( ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1` ALTER CONSUMER c2 SET (read_from = Timestamp('2021-01-01T01:01:01Z')), @@ -150,20 +155,30 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { server.AnnoyingClient->RunYqlSchemeQuery(query3); pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription() - .GetPersQueueGroup(); - const auto& descr3 = pqGroup2.GetPQTabletConfig(); - Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString(); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetLifetimeSeconds(), 7200); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetBurstSize(), 100501); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().ImportantClientIdSize(), 1); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetImportantClientId(0), "c3"); + .GetPersQueueGroup(); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetConsumerCodecs(0).IdsSize(), 2); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetCodecs().IdsSize(), 1); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetReadFromTimestampsMs(0), 1609462861000); - UNIT_ASSERT_VALUES_EQUAL(pqGroup2.GetTotalGroupCount(), 2); + { + const auto& describe = pqGroup2.GetPQTabletConfig(); + Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup2.DebugString(); + UNIT_ASSERT_VALUES_EQUAL(pqGroup2.GetTotalGroupCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetLifetimeSeconds(), 7200); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100501); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001); + + UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2); + auto& consumer1 = describe.GetConsumers(0); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2"); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), false); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 1609462861000); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 2); + + auto& consumer2 = describe.GetConsumers(1); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetName(), "c3"); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetImportant(), true); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetCodec().IdsSize(), 0); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetReadFromTimestampsMs(), 0); + } //1609462861 } |
