From 6d41d83f92ba5ba9a045f2edd2cf41aae7187272 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 29 Aug 2024 14:46:22 +0500 Subject: Delete deprecated code (ReadRules,..) (#8405) --- ydb/core/persqueue/pq_impl.cpp | 4 +- ydb/core/persqueue/ut/common/pq_ut_common.cpp | 3 +- ydb/core/persqueue/utils.cpp | 8 +++ ydb/core/persqueue/utils.h | 3 - ydb/services/lib/actors/pq_schema_actor.cpp | 95 +-------------------------- ydb/services/persqueue_v1/persqueue_ut.cpp | 24 +------ ydb/services/persqueue_v1/topic_yql_ut.cpp | 91 ++++++++++++++----------- 7 files changed, 69 insertions(+), 159 deletions(-) diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 7d37bcb0fa9..a7108b84c30 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1746,9 +1746,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtrAddIds(codec - 1); cct->AddCodecs(codecName); - - if (NPQ::ReadRuleCompatible()) { - ct->CopyFrom(*cct); - } } if (rr.important()) { consumer->SetImportant(true); - if (NPQ::ReadRuleCompatible()) { - config->MutablePartitionConfig()->AddImportantClientId(consumerName); - } } if (!rr.service_type().empty()) { @@ -178,9 +158,6 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetServiceType(rr.service_type()); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleServiceTypes(rr.service_type()); - } } else { if (pqConfig.GetDisallowDefaultClientServiceType()) { return TMsgPqCodes( @@ -190,9 +167,6 @@ namespace NKikimr::NGRpcProxy::V1 { } const auto& defaultCientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); consumer->SetServiceType(defaultCientServiceType); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleServiceTypes(defaultCientServiceType); - } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); } @@ -238,9 +212,6 @@ namespace NKikimr::NGRpcProxy::V1 { auto* consumer = config->AddConsumers(); consumer->SetName(consumerName); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRules(consumerName); - } if (rr.read_from().seconds() < 0) { return TMsgPqCodes( @@ -249,19 +220,10 @@ namespace NKikimr::NGRpcProxy::V1 { ); } consumer->SetReadFromTimestampsMs(rr.read_from().seconds() * 1000); - if (NPQ::ReadRuleCompatible()) { - config->AddReadFromTimestampsMs(rr.read_from().seconds() * 1000); - } - consumer->SetFormatVersion(0); - if (NPQ::ReadRuleCompatible()) { - config->AddConsumerFormatVersions(0); - } - - TString serviceType; const auto& defaultClientServiceType = pqConfig.GetDefaultClientServiceType().GetName(); - serviceType = defaultClientServiceType; + TString serviceType = defaultClientServiceType; TString passwordHash = ""; bool hasPassword = false; @@ -318,17 +280,9 @@ namespace NKikimr::NGRpcProxy::V1 { } consumer->SetServiceType(serviceType); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleServiceTypes(serviceType); - } - consumer->SetVersion(version); - if (NPQ::ReadRuleCompatible()) { - config->AddReadRuleVersions(version); - } auto* cct = consumer->MutableCodec(); - auto* ct = NPQ::ReadRuleCompatible() ? config->AddConsumerCodecs() : nullptr; for(const auto& codec : rr.supported_codecs().codecs()) { if ((!Ydb::Topic::Codec_IsValid(codec) && codec < Ydb::Topic::CODEC_CUSTOM) || codec == 0) { @@ -339,10 +293,6 @@ namespace NKikimr::NGRpcProxy::V1 { } cct->AddIds(codec - 1); cct->AddCodecs(Ydb::Topic::Codec_IsValid(codec) ? LegacySubstr(to_lower(Ydb::Topic::Codec_Name((Ydb::Topic::Codec)codec)), 6) : "CUSTOM"); - - if (NPQ::ReadRuleCompatible()) { - ct->CopyFrom(*cct); - } } if (rr.important()) { @@ -350,9 +300,6 @@ namespace NKikimr::NGRpcProxy::V1 { return TMsgPqCodes(TStringBuilder() << "important flag is forbiden for consumer " << rr.name(), Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT); } consumer->SetImportant(true); - if (NPQ::ReadRuleCompatible()) { - config->MutablePartitionConfig()->AddImportantClientId(consumerName); - } } return TMsgPqCodes("", Ydb::PersQueue::ErrorCode::OK); @@ -363,7 +310,7 @@ namespace NKikimr::NGRpcProxy::V1 { NKikimrPQ::TPQTabletConfig* config, const NKikimrPQ::TPQTabletConfig& originalConfig, const TString& consumerName, - const NKikimrPQ::TPQConfig& pqConfig + const NKikimrPQ::TPQConfig& /*pqConfig*/ ) { config->ClearReadRuleVersions(); config->ClearReadRules(); @@ -374,46 +321,8 @@ namespace NKikimr::NGRpcProxy::V1 { config->ClearReadRuleServiceTypes(); config->ClearConsumers(); - if (NPQ::ReadRuleCompatible()) { - for (const auto& importantConsumer : originalConfig.GetPartitionConfig().GetImportantClientId()) { - if (importantConsumer != consumerName) { - config->MutablePartitionConfig()->AddImportantClientId(importantConsumer); - } - } - } - bool removed = false; - if (NPQ::ReadRuleCompatible()) { - for (size_t i = 0; i < originalConfig.ReadRulesSize(); i++) { - auto& readRule = originalConfig.GetReadRules(i); - - if (readRule == consumerName) { - removed = true; - continue; - } - - config->AddReadRuleVersions(originalConfig.GetReadRuleVersions(i)); - config->AddReadRules(readRule); - config->AddReadFromTimestampsMs(originalConfig.GetReadFromTimestampsMs(i)); - config->AddConsumerFormatVersions(originalConfig.GetConsumerFormatVersions(i)); - auto* ct = config->AddConsumerCodecs(); - for (size_t j = 0; j < originalConfig.GetConsumerCodecs(i).CodecsSize(); j++) { - ct->AddCodecs(originalConfig.GetConsumerCodecs(i).GetCodecs(j)); - ct->AddIds(originalConfig.GetConsumerCodecs(i).GetIds(j)); - } - if (i < originalConfig.ReadRuleServiceTypesSize()) { - config->AddReadRuleServiceTypes(originalConfig.GetReadRuleServiceTypes(i)); - } else { - if (pqConfig.GetDisallowDefaultClientServiceType()) { - return TStringBuilder() << "service type cannot be empty for consumer '" - << readRule << "'"; - } - config->AddReadRuleServiceTypes(pqConfig.GetDefaultClientServiceType().GetName()); - } - } - } - for (auto& consumer : originalConfig.GetConsumers()) { if (consumerName == consumer.GetName()) { removed = true; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index c47927afa0d..4966ba9330a 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -5028,7 +5028,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; MaxCountInPartition: 2147483647 MaxSizeInPartition: 234 LifetimeSeconds: 172800 - ImportantClientId: "consumer" SourceIdLifetimeSeconds: 1382400 WriteSpeedInBytesPerSecond: 123 BurstSize: 1000 @@ -5078,22 +5077,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Ident: "acc" Topic: "topic3" DC: "dc1" - ReadRules: "first-consumer" - ReadRules: "consumer" - ReadFromTimestampsMs: 11223344000 - ReadFromTimestampsMs: 111000 - ConsumerFormatVersions: 0 - ConsumerFormatVersions: 0 - ConsumerCodecs { - } - ConsumerCodecs { - Ids: 2 - Ids: 10004 - Codecs: "lzop" - Codecs: "CUSTOM" - } - ReadRuleServiceTypes: "data-streams" - ReadRuleServiceTypes: "data-streams" FormatVersion: 0 Codecs { Ids: 2 @@ -5101,8 +5084,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Codecs: "lzop" Codecs: "CUSTOM" } - ReadRuleVersions: 0 - ReadRuleVersions: 567 TopicPath: "/Root/PQ/rt3.dc1--acc--topic3" YdbDatabasePath: "/Root" Consumers { @@ -5113,7 +5094,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; } ServiceType: "data-streams" Version: 0 - Important: false } Consumers { Name: "consumer" @@ -6101,6 +6081,7 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; auto checkDescribe = [&](const TString& topic, const TVector>& readRules) { + Cerr << ">>>>> Check topic: " << topic << Endl; DescribeTopicRequest request; DescribeTopicResponse response; request.set_path(topic); @@ -6110,7 +6091,8 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; UNIT_ASSERT(status.ok()); DescribeTopicResult res; response.operation().result().UnpackTo(&res); - Cerr << response << "\n" << res << "\n"; + Cerr << ">>>>> Response: " << response << Endl; + Cerr << ">>>>> Result:" << res << "\n"; UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); UNIT_ASSERT_VALUES_EQUAL(res.settings().read_rules().size(), readRules.size()); diff --git a/ydb/services/persqueue_v1/topic_yql_ut.cpp b/ydb/services/persqueue_v1/topic_yql_ut.cpp index 8f698f703dc..8068d52ac92 100644 --- a/ydb/services/persqueue_v1/topic_yql_ut.cpp +++ b/ydb/services/persqueue_v1/topic_yql_ut.cpp @@ -84,40 +84,45 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { auto after = CheckPQChildrenSize("after create"); UNIT_ASSERT_VALUES_EQUAL(after, before + 1); auto pqGroup = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription() - .GetPersQueueGroup(); - const auto& describeAfterCreate = pqGroup.GetPQTabletConfig(); - Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString(); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetLifetimeSeconds(), 3600); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetBurstSize(), 100500); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9000); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().ImportantClientIdSize(), 1); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionConfig().GetImportantClientId(0), "c2"); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetMinPartitionCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetMaxPartitionCount(), 5); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleThresholdSeconds(), 60); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 50); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 40); - UNIT_ASSERT_VALUES_EQUAL(static_cast(describeAfterCreate.GetPartitionStrategy().GetPartitionStrategyType()), static_cast(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT)); - - auto& codecs = describeAfterCreate.GetConsumerCodecs(1); - UNIT_ASSERT_VALUES_EQUAL(codecs.IdsSize(), 3); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetCodecs().IdsSize(), 2); - UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2); - UNIT_ASSERT_VALUES_EQUAL(describeAfterCreate.GetReadFromTimestampsMs(1), 100 * 1000); - - auto expectedDescr = describeAfterCreate; + .GetPersQueueGroup(); + + { + const auto& describe = pqGroup.GetPQTabletConfig(); + Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString(); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetLifetimeSeconds(), 3600); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100500); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9000); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMinPartitionCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetMaxPartitionCount(), 5); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleThresholdSeconds(), 60); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent(), 50); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent(), 40); + UNIT_ASSERT_VALUES_EQUAL(static_cast(describe.GetPartitionStrategy().GetPartitionStrategyType()), static_cast(::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT)); + UNIT_ASSERT_VALUES_EQUAL(pqGroup.GetTotalGroupCount(), 2); + + UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2); + + auto& consumer0 = describe.GetConsumers()[0]; + UNIT_ASSERT_VALUES_EQUAL(consumer0.GetName(), "c1"); + UNIT_ASSERT_VALUES_EQUAL(consumer0.GetImportant(), false); + + auto& consumer1 = describe.GetConsumers()[1]; + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2"); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), true); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 3); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 100 * 1000); + } + auto expectedDescr = pqGroup.GetPQTabletConfig(); { auto partCfg = expectedDescr.MutablePartitionConfig(); partCfg->SetLifetimeSeconds(7200); partCfg->SetBurstSize(100501); partCfg->SetWriteSpeedInBytesPerSecond(9001); - auto* rtfs = expectedDescr.MutableReadFromTimestampsMs(); - rtfs->Set(1, 1609462861000); expectedDescr.MutablePartitionStrategy()->SetMinPartitionCount(3); - expectedDescr.MutableConsumers(1)->SetReadFromTimestampsMs(1609462861000); } + const char *query2 = R"__( ALTER TOPIC `/Root/PQ/rt3.dc1--legacy--topic1` ALTER CONSUMER c2 SET (read_from = Timestamp('2021-01-01T01:01:01Z')), @@ -150,20 +155,30 @@ Y_UNIT_TEST_SUITE(TTopicYqlTest) { server.AnnoyingClient->RunYqlSchemeQuery(query3); pqGroup2 = server.AnnoyingClient->Ls("/Root/PQ/rt3.dc1--legacy--topic1")->Record.GetPathDescription() - .GetPersQueueGroup(); - const auto& descr3 = pqGroup2.GetPQTabletConfig(); - Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup.DebugString(); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetLifetimeSeconds(), 7200); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetBurstSize(), 100501); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().ImportantClientIdSize(), 1); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetPartitionConfig().GetImportantClientId(0), "c3"); - - UNIT_ASSERT_VALUES_EQUAL(descr3.GetConsumerCodecs(0).IdsSize(), 2); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetCodecs().IdsSize(), 1); - UNIT_ASSERT_VALUES_EQUAL(descr3.GetReadFromTimestampsMs(0), 1609462861000); - UNIT_ASSERT_VALUES_EQUAL(pqGroup2.GetTotalGroupCount(), 2); + .GetPersQueueGroup(); + { + const auto& describe = pqGroup2.GetPQTabletConfig(); + Cerr <<"=== PATH DESCRIPTION: \n" << pqGroup2.DebugString(); + UNIT_ASSERT_VALUES_EQUAL(pqGroup2.GetTotalGroupCount(), 2); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetLifetimeSeconds(), 7200); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetBurstSize(), 100501); + UNIT_ASSERT_VALUES_EQUAL(describe.GetPartitionConfig().GetWriteSpeedInBytesPerSecond(), 9001); + + UNIT_ASSERT_VALUES_EQUAL(describe.GetConsumers().size(), 2); + + auto& consumer1 = describe.GetConsumers(0); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetName(), "c2"); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetImportant(), false); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetReadFromTimestampsMs(), 1609462861000); + UNIT_ASSERT_VALUES_EQUAL(consumer1.GetCodec().IdsSize(), 2); + + auto& consumer2 = describe.GetConsumers(1); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetName(), "c3"); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetImportant(), true); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetCodec().IdsSize(), 0); + UNIT_ASSERT_VALUES_EQUAL(consumer2.GetReadFromTimestampsMs(), 0); + } //1609462861 } -- cgit v1.3