diff options
author | abcdef <akotov@ydb.tech> | 2023-03-29 13:25:48 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-03-29 13:25:48 +0300 |
commit | 6c0f0a31d91f022806ac938fef45dc9d6c125fb4 (patch) | |
tree | 2f6381e8c5cfab7d72569d43b9f4a7a5ffc698f4 | |
parent | d7fbc6b4ba21815810594ca5941788d47442ee80 (diff) | |
download | ydb-6c0f0a31d91f022806ac938fef45dc9d6c125fb4.tar.gz |
PERSQUEUE_READ_SPEED_LIMITER actors are leaking
Перенесены изменения из задачи
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 177 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.cpp | 14 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 31 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 19 |
5 files changed, 92 insertions, 160 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 17dc5592bce..0d3880213e5 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1976,7 +1976,6 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas return jt; } - void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { TSet<TString> important; for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) { @@ -2005,7 +2004,6 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { } } - void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) { PushBackDistrTx(ev->Release()); @@ -3726,11 +3724,11 @@ void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContex void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) { Y_VERIFY(cookie == SET_OFFSET_COOKIE); + if (ChangeConfig) { EndChangePartitionConfig(ChangeConfig->Config, ChangeConfig->TopicConverter, ctx); - ChangeConfig = nullptr; } for (auto& user : AffectedUsers) { @@ -3743,13 +3741,17 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) userInfo.Step = actual->Step; userInfo.Offset = actual->Offset; userInfo.ReadRuleGeneration = actual->ReadRuleGeneration; + userInfo.ReadFromTimestamp = actual->ReadFromTimestamp; + userInfo.HasReadRule = true; - if (PendingHasReadRule.contains(user)) { - userInfo.HasReadRule = true; + if (userInfo.Important != actual->Important) { + if (userInfo.LabeledCounters) { + ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); + } + userInfo.SetImportant(actual->Important); } - - if (auto p = PendingReadFromTimestamp.find(user); p != PendingReadFromTimestamp.end()) { - userInfo.ReadFromTimestamp = p->second; + if (userInfo.Important && userInfo.Offset < (i64)StartOffset) { + userInfo.Offset = StartOffset; } if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) { @@ -3759,13 +3761,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1); } } else { - UsersInfoStorage->Remove(user, ctx); - } - } + auto ui = UsersInfoStorage->GetIfExists(user); + if (ui && ui->LabeledCounters) { + ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup()); + } - for (auto& [consumer, important] : PendingSetImportant) { - if (auto* userInfo = UsersInfoStorage->GetIfExists(consumer); userInfo) { - userInfo->SetImportant(important); + UsersInfoStorage->Remove(user, ctx); } } @@ -3776,14 +3777,17 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) PendingUsersInfo.clear(); Replies.clear(); AffectedUsers.clear(); - PendingReadFromTimestamp.clear(); - PendingSetImportant.clear(); - PendingHasReadRule.clear(); UsersInfoWriteInProgress = false; TxIdHasChanged = false; + if (ChangeConfig) { + ReportCounters(ctx); + ChangeConfig = nullptr; + } + + ProcessTxsAndUserActs(ctx); } @@ -3908,6 +3912,7 @@ void TPartition::ProcessDistrTxs(const TActorContext& ctx) bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, const TActorContext& ctx) { + Y_UNUSED(ctx); bool predicate = true; for (auto& operation : tx.Operations) { @@ -3924,7 +3929,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, } bool isAffectedConsumer = AffectedUsers.contains(consumer); - TUserInfo& userInfo = GetOrCreatePendingUser(consumer, ctx); + TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer); if (operation.GetBegin() > operation.GetEnd()) { // BAD_REQUEST @@ -3977,7 +3982,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event, Y_VERIFY(t.Predicate.Defined() && *t.Predicate); for (auto& operation : t.Tx->Operations) { - TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx); + TUserInfoBase& userInfo = GetOrCreatePendingUser(operation.GetConsumer()); Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin()); @@ -4041,64 +4046,45 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event, void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { - InitPendingUserInfoForImportantClients(config, ctx); - TSet<TString> hasReadRule; for (auto& [consumer, info] : UsersInfoStorage->GetAll()) { - PendingReadFromTimestamp[consumer] = TInstant::Zero(); - hasReadRule.insert(consumer); } + TSet<TString> important; + for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) { + important.insert(importantUser); + } + for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); - auto& userInfo = GetOrCreatePendingUser(consumer, ctx, 0); + auto& userInfo = GetOrCreatePendingUser(consumer, 0); + + TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + if (!ts) { + ts += TDuration::MilliSeconds(1); + } + userInfo.ReadFromTimestamp = ts; + userInfo.Important = important.contains(consumer); ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; if (userInfo.ReadRuleGeneration != rrGen) { TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); - userInfo.Session = ""; - userInfo.Offset = 0; - if (userInfo.Important) { - userInfo.Offset = StartOffset; - } - userInfo.Step = userInfo.Generation = 0; - ProcessUserAct(act, ctx); } - - userInfo.HasReadRule = true; hasReadRule.erase(consumer); - - TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); - if (!ts) { - ts += TDuration::MilliSeconds(1); - } - if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) { - PendingReadFromTimestamp[consumer] = ts; - } - - PendingHasReadRule.insert(consumer); } for (auto& consumer : hasReadRule) { - auto& userInfo = GetOrCreatePendingUser(consumer, ctx); + GetOrCreatePendingUser(consumer); TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); - ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); - - userInfo.Session = ""; - userInfo.Offset = 0; - userInfo.Step = userInfo.Generation = 0; - ProcessUserAct(act, ctx); } - - ReportCounters(ctx); } void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, @@ -4124,10 +4110,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); } - if (CurrentStateFunc() != &TThis::StateInit) { - InitUserInfoForImportantClients(ctx); - } - if (Config.GetPartitionConfig().HasMirrorFrom()) { if (Mirrorer) { ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, @@ -4145,7 +4127,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf if (SendChangeConfigReply) { SchedulePartitionConfigChanged(); } - ReportCounters(ctx); } TString TPartition::GetKeyConfig() const @@ -4168,41 +4149,6 @@ void TPartition::ResendPendingEvents(const TActorContext& ctx) } } -void TPartition::InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config, - const TActorContext& ctx) { - TSet<TString> important; - - for (const auto& consumer : config.GetPartitionConfig().GetImportantClientId()) { - important.insert(consumer); - - TUserInfo* userInfo = GetPendingUserIfExists(consumer); - - if (userInfo && !userInfo->Important && userInfo->LabeledCounters) { - ScheduleDropPartitionLabeledCounters(userInfo->LabeledCounters->GetGroup()); - PendingSetImportant[consumer] = true; - continue; - } - - if (!userInfo) { - userInfo = &GetOrCreatePendingUser(consumer, ctx, 0); - PendingSetImportant[consumer] = true; - } - - if (userInfo->Offset < (i64)StartOffset) { - userInfo->Offset = StartOffset; - } - - //ReadTimestampForOffset(consumer, *userInfo, ctx); - } - - for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) { - if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { - ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); - PendingSetImportant[consumer] = false; - } - } -} - void TPartition::ProcessDistrTx(const TActorContext& ctx) { Y_VERIFY(!TxInProgress); @@ -4254,6 +4200,8 @@ void TPartition::ProcessImmediateTxs(const TActorContext& ctx) void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx, const TActorContext& ctx) { + Y_UNUSED(ctx); + Y_VERIFY(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); Y_VERIFY(tx.HasData()); @@ -4271,7 +4219,7 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx, return; } - TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx); + TUserInfoBase& userInfo = GetOrCreatePendingUser(user); if (operation.GetBegin() > operation.GetEnd()) { ScheduleReplyPropose(tx, @@ -4332,7 +4280,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, } } - TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx); + TUserInfoBase& userInfo = GetOrCreatePendingUser(user); if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE) { LOG_DEBUG_S( @@ -4347,7 +4295,8 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, } if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION && act.SessionId == userInfo.Session) { //this is retry of current request, answer ok - auto ts = GetTime(userInfo, userInfo.Offset); + auto *ui = UsersInfoStorage->GetIfExists(userInfo.User); + auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero()); ScheduleReplyGetClientOffsetOk(act.Cookie, userInfo.Offset, @@ -4432,7 +4381,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, } void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, - TUserInfo& userInfo, + TUserInfoBase& userInfo, const TActorContext& ctx) { const TString& user = act.ClientId; @@ -4457,7 +4406,6 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, "Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user << " drop done" ); - PendingUsersInfo.erase(user); } else if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE) { LOG_DEBUG_S( @@ -4470,7 +4418,6 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, userInfo.Session = ""; userInfo.Generation = userInfo.Step = 0; userInfo.Offset = 0; - userInfo.ReadScheduled = false; if (userInfo.Important) { userInfo.Offset = StartOffset; @@ -4478,7 +4425,8 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, } else { if (setSession || dropSession) { offset = userInfo.Offset; - auto ts = GetTime(userInfo, offset); + auto *ui = UsersInfoStorage->GetIfExists(userInfo.User); + auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero()); ScheduleReplyGetClientOffsetOk(act.Cookie, offset, @@ -4644,11 +4592,12 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request) TKeyPrefix ikeyDeprecated(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUserDeprecated); ikeyDeprecated.Append(user.c_str(), user.size()); - if (TUserInfo* userInfo = GetPendingUserIfExists(user)) { + if (TUserInfoBase* userInfo = GetPendingUserIfExists(user)) { + auto *ui = UsersInfoStorage->GetIfExists(user); AddCmdWrite(request, ikey, ikeyDeprecated, userInfo->Offset, userInfo->Generation, userInfo->Step, userInfo->Session, - userInfo->ReadOffsetRewindSum, + ui ? ui->ReadOffsetRewindSum : 0, userInfo->ReadRuleGeneration); } else { AddCmdDeleteRange(request, @@ -4674,25 +4623,25 @@ void TPartition::AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request) write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); } -TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user, - const TActorContext& ctx, - TMaybe<ui64> readRuleGeneration) +TUserInfoBase& TPartition::GetOrCreatePendingUser(const TString& user, + TMaybe<ui64> readRuleGeneration) { - TUserInfo* userInfo = UsersInfoStorage->GetIfExists(user); + TUserInfoBase* userInfo = nullptr; auto i = PendingUsersInfo.find(user); if (i == PendingUsersInfo.end()) { + auto ui = UsersInfoStorage->GetIfExists(user); auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user, - ctx, readRuleGeneration)); - if (userInfo) { - p->second.Session = userInfo->Session; - p->second.Generation = userInfo->Generation; - p->second.Step = userInfo->Step; - p->second.Offset = userInfo->Offset; - p->second.ReadRuleGeneration = userInfo->ReadRuleGeneration; - p->second.ReadScheduled = userInfo->ReadScheduled; + if (ui) { + p->second.Session = ui->Session; + p->second.Generation = ui->Generation; + p->second.Step = ui->Step; + p->second.Offset = ui->Offset; + p->second.ReadRuleGeneration = ui->ReadRuleGeneration; + p->second.Important = ui->Important; + p->second.ReadFromTimestamp = ui->ReadFromTimestamp; } userInfo = &p->second; @@ -4705,7 +4654,7 @@ TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user, return *userInfo; } -TUserInfo* TPartition::GetPendingUserIfExists(const TString& user) +TUserInfoBase* TPartition::GetPendingUserIfExists(const TString& user) { if (auto i = PendingUsersInfo.find(user); i != PendingUsersInfo.end()) { return &i->second; diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 05ec9e0b556..2691e073765 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -241,7 +241,7 @@ private: void ProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx); void EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act, - TUserInfo& userInfo, + TUserInfoBase& userInfo, const TActorContext& ctx); void ScheduleReplyOk(const ui64 dst); @@ -269,8 +269,8 @@ private: void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated); - TUserInfo& GetOrCreatePendingUser(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {}); - TUserInfo* GetPendingUserIfExists(const TString& user); + TUserInfoBase& GetOrCreatePendingUser(const TString& user, TMaybe<ui64> readRuleGeneration = {}); + TUserInfoBase* GetPendingUserIfExists(const TString& user); THolder<TEvPQ::TEvProxyResponse> MakeReplyOk(const ui64 dst); THolder<TEvPQ::TEvProxyResponse> MakeReplyGetClientOffsetOk(const ui64 dst, @@ -585,10 +585,7 @@ private: std::deque<TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>> ImmediateTxs; std::deque<TTransaction> DistrTxs; THashMap<TString, size_t> UserActCount; - THashMap<TString, TUserInfo> PendingUsersInfo; - THashMap<TString, TInstant> PendingReadFromTimestamp; - THashMap<TString, bool> PendingSetImportant; - THashSet<TString> PendingHasReadRule; + THashMap<TString, TUserInfoBase> PendingUsersInfo; TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies; THashSet<TString> AffectedUsers; bool UsersInfoWriteInProgress = false; diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp index 89e63abb720..b64c92609f7 100644 --- a/ydb/core/persqueue/user_info.cpp +++ b/ydb/core/persqueue/user_info.cpp @@ -192,24 +192,18 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx, bool meterRead = userServiceType.empty() || userServiceType == defaultServiceType; - return { + return { ctx, StreamCountersSubgroup, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition, session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, DbPath, meterRead, burst, speed }; } -TUserInfo TUsersInfoStorage::CreateUserInfo(const TString& user, - const TActorContext& ctx, +TUserInfoBase TUsersInfoStorage::CreateUserInfo(const TString& user, TMaybe<ui64> readRuleGeneration) const { - return CreateUserInfo(ctx, - user, - readRuleGeneration ? *readRuleGeneration : ++CurReadRuleGeneration, - false, - "", - 0, 0, 0, 0, - TInstant::Zero()); + return TUserInfoBase{user, readRuleGeneration ? *readRuleGeneration : ++CurReadRuleGeneration, + "", 0, 0, 0, false, {}}; } TUserInfo& TUsersInfoStorage::Create( diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index 034edacdc21..8e5faf9d0e8 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -47,13 +47,22 @@ struct TReadSpeedLimiterHolder { TTabletCountersBase Baseline; }; -struct TUserInfo { - THolder<TReadSpeedLimiterHolder> ReadSpeedLimiter; +struct TUserInfoBase { + TString User; + ui64 ReadRuleGeneration = 0; TString Session = ""; ui32 Generation = 0; ui32 Step = 0; i64 Offset = 0; + + bool Important = false; + TInstant ReadFromTimestamp; +}; + +struct TUserInfo: public TUserInfoBase { + THolder<TReadSpeedLimiterHolder> ReadSpeedLimiter; + TInstant WriteTimestamp; TInstant CreateTimestamp; TInstant ReadTimestamp; @@ -70,12 +79,8 @@ struct TUserInfo { //When client will commit to new position, timestamps for this offset could be in cache - not insane client should read data before commit std::deque<std::pair<ui64, std::pair<TInstant, TInstant>>> Cache; - bool Important = false; - TInstant ReadFromTimestamp; bool HasReadRule = false; THolder<TUserLabeledCounters> LabeledCounters; - TString User; - ui64 ReadRuleGeneration = 0; NPersQueue::TTopicConverterPtr TopicConverter; std::deque<std::pair<TReadInfo, ui64>> ReadRequests; @@ -169,11 +174,8 @@ struct TUserInfo { const TString& dbPath, bool meterRead, ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000 ) - : ReadSpeedLimiter(std::move(readSpeedLimiter)) - , Session(session) - , Generation(gen) - , Step(step) - , Offset(offset) + : TUserInfoBase{user, readRuleGeneration, session, gen, step, offset, important, readFromTimestamp} + , ReadSpeedLimiter(std::move(readSpeedLimiter)) , WriteTimestamp(TAppData::TimeProvider->Now()) , CreateTimestamp(TAppData::TimeProvider->Now()) , ReadTimestamp(TAppData::TimeProvider->Now()) @@ -183,11 +185,7 @@ struct TUserInfo { , ReadCreateTimestamp(TAppData::TimeProvider->Now()) , ReadOffsetRewindSum(readOffsetRewindSum) , ReadScheduled(false) - , Important(important) - , ReadFromTimestamp(readFromTimestamp) , HasReadRule(false) - , User(user) - , ReadRuleGeneration(readRuleGeneration) , TopicConverter(topicConverter) , ReadQuota(burst, speed, TAppData::TimeProvider->Now()) , Counter(nullptr) @@ -397,8 +395,7 @@ public: THashMap<TString, TUserInfo>& GetAll(); - TUserInfo CreateUserInfo(const TString& user, - const TActorContext& ctx, + TUserInfoBase CreateUserInfo(const TString& user, TMaybe<ui64> readRuleGeneration = {}) const; TUserInfo& Create( const TActorContext& ctx, const TString& user, const ui64 readRuleGeneration, bool important, const TString &session, diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index 04aae14c11c..1d5973fb6b1 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -288,12 +288,15 @@ void CheckLabeledCountersResponse(TTestContext& tc, ui32 count, TVector<TString> THashSet<TString> groups; + Cerr << "NEW ANS:\n"; for (ui32 i = 0; i < result->Record.LabeledCountersByGroupSize(); ++i) { auto& c = result->Record.GetLabeledCountersByGroup(i); groups.insert(c.GetGroup()); + Cerr << "ANS GROUP " << c.GetGroup() << "\n"; } UNIT_ASSERT_VALUES_EQUAL(groups.size(), count); for (auto& g : mustHave) { + Cerr << "CHECKING GROUP " << g << "\n"; UNIT_ASSERT(groups.contains(g)); } } @@ -365,29 +368,21 @@ Y_UNIT_TEST(ImportantFlagSwitching) { CheckLabeledCountersResponse(tc, 11, MakeTopics({"user/1", "user2/1"})); PQTabletPrepare({}, {{"user", true}, {"user2", false}}, tc); - { - TDispatchOptions options; - options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters); - tc.Runtime->DispatchEvents(options); - } - { + for (ui32 i = 0 ; i < 2; ++i){ TDispatchOptions options; options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters); tc.Runtime->DispatchEvents(options); } + CheckLabeledCountersResponse(tc, 12, MakeTopics({"user/1", "user2/0"})); PQTabletPrepare({}, {{"user", true}}, tc); - { - TDispatchOptions options; - options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters); - tc.Runtime->DispatchEvents(options); - } - { + for (ui32 i = 0 ; i < 2; ++i){ TDispatchOptions options; options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters); tc.Runtime->DispatchEvents(options); } + CheckLabeledCountersResponse(tc, 8, MakeTopics({"user/1"})); }); } |