aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-03-29 13:25:48 +0300
committerabcdef <akotov@ydb.tech>2023-03-29 13:25:48 +0300
commit6c0f0a31d91f022806ac938fef45dc9d6c125fb4 (patch)
tree2f6381e8c5cfab7d72569d43b9f4a7a5ffc698f4
parentd7fbc6b4ba21815810594ca5941788d47442ee80 (diff)
downloadydb-6c0f0a31d91f022806ac938fef45dc9d6c125fb4.tar.gz
PERSQUEUE_READ_SPEED_LIMITER actors are leaking
Перенесены изменения из задачи
-rw-r--r--ydb/core/persqueue/partition.cpp177
-rw-r--r--ydb/core/persqueue/partition.h11
-rw-r--r--ydb/core/persqueue/user_info.cpp14
-rw-r--r--ydb/core/persqueue/user_info.h31
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp19
5 files changed, 92 insertions, 160 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 17dc5592bce..0d3880213e5 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1976,7 +1976,6 @@ THashMap<TString, NKikimr::NPQ::TOwnerInfo>::iterator TPartition::DropOwner(THas
return jt;
}
-
void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
TSet<TString> important;
for (const auto& importantUser : Config.GetPartitionConfig().GetImportantClientId()) {
@@ -2005,7 +2004,6 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
}
}
-
void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
PushBackDistrTx(ev->Release());
@@ -3726,11 +3724,11 @@ void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContex
void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) {
Y_VERIFY(cookie == SET_OFFSET_COOKIE);
+
if (ChangeConfig) {
EndChangePartitionConfig(ChangeConfig->Config,
ChangeConfig->TopicConverter,
ctx);
- ChangeConfig = nullptr;
}
for (auto& user : AffectedUsers) {
@@ -3743,13 +3741,17 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
userInfo.Step = actual->Step;
userInfo.Offset = actual->Offset;
userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
+ userInfo.ReadFromTimestamp = actual->ReadFromTimestamp;
+ userInfo.HasReadRule = true;
- if (PendingHasReadRule.contains(user)) {
- userInfo.HasReadRule = true;
+ if (userInfo.Important != actual->Important) {
+ if (userInfo.LabeledCounters) {
+ ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
+ }
+ userInfo.SetImportant(actual->Important);
}
-
- if (auto p = PendingReadFromTimestamp.find(user); p != PendingReadFromTimestamp.end()) {
- userInfo.ReadFromTimestamp = p->second;
+ if (userInfo.Important && userInfo.Offset < (i64)StartOffset) {
+ userInfo.Offset = StartOffset;
}
if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) {
@@ -3759,13 +3761,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
}
} else {
- UsersInfoStorage->Remove(user, ctx);
- }
- }
+ auto ui = UsersInfoStorage->GetIfExists(user);
+ if (ui && ui->LabeledCounters) {
+ ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
+ }
- for (auto& [consumer, important] : PendingSetImportant) {
- if (auto* userInfo = UsersInfoStorage->GetIfExists(consumer); userInfo) {
- userInfo->SetImportant(important);
+ UsersInfoStorage->Remove(user, ctx);
}
}
@@ -3776,14 +3777,17 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
PendingUsersInfo.clear();
Replies.clear();
AffectedUsers.clear();
- PendingReadFromTimestamp.clear();
- PendingSetImportant.clear();
- PendingHasReadRule.clear();
UsersInfoWriteInProgress = false;
TxIdHasChanged = false;
+ if (ChangeConfig) {
+ ReportCounters(ctx);
+ ChangeConfig = nullptr;
+ }
+
+
ProcessTxsAndUserActs(ctx);
}
@@ -3908,6 +3912,7 @@ void TPartition::ProcessDistrTxs(const TActorContext& ctx)
bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
const TActorContext& ctx)
{
+ Y_UNUSED(ctx);
bool predicate = true;
for (auto& operation : tx.Operations) {
@@ -3924,7 +3929,7 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
}
bool isAffectedConsumer = AffectedUsers.contains(consumer);
- TUserInfo& userInfo = GetOrCreatePendingUser(consumer, ctx);
+ TUserInfoBase& userInfo = GetOrCreatePendingUser(consumer);
if (operation.GetBegin() > operation.GetEnd()) {
// BAD_REQUEST
@@ -3977,7 +3982,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,
Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
for (auto& operation : t.Tx->Operations) {
- TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx);
+ TUserInfoBase& userInfo = GetOrCreatePendingUser(operation.GetConsumer());
Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin());
@@ -4041,64 +4046,45 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,
void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx)
{
- InitPendingUserInfoForImportantClients(config, ctx);
-
TSet<TString> hasReadRule;
for (auto& [consumer, info] : UsersInfoStorage->GetAll()) {
- PendingReadFromTimestamp[consumer] = TInstant::Zero();
-
hasReadRule.insert(consumer);
}
+ TSet<TString> important;
+ for (const auto& importantUser : config.GetPartitionConfig().GetImportantClientId()) {
+ important.insert(importantUser);
+ }
+
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
- auto& userInfo = GetOrCreatePendingUser(consumer, ctx, 0);
+ auto& userInfo = GetOrCreatePendingUser(consumer, 0);
+
+ TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
+ if (!ts) {
+ ts += TDuration::MilliSeconds(1);
+ }
+ userInfo.ReadFromTimestamp = ts;
+ userInfo.Important = important.contains(consumer);
ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
if (userInfo.ReadRuleGeneration != rrGen) {
TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0,
TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);
- userInfo.Session = "";
- userInfo.Offset = 0;
- if (userInfo.Important) {
- userInfo.Offset = StartOffset;
- }
- userInfo.Step = userInfo.Generation = 0;
-
ProcessUserAct(act, ctx);
}
-
- userInfo.HasReadRule = true;
hasReadRule.erase(consumer);
-
- TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
- if (!ts) {
- ts += TDuration::MilliSeconds(1);
- }
- if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) {
- PendingReadFromTimestamp[consumer] = ts;
- }
-
- PendingHasReadRule.insert(consumer);
}
for (auto& consumer : hasReadRule) {
- auto& userInfo = GetOrCreatePendingUser(consumer, ctx);
+ GetOrCreatePendingUser(consumer);
TEvPQ::TEvSetClientInfo act(0, consumer,
0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
- ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
-
- userInfo.Session = "";
- userInfo.Offset = 0;
- userInfo.Step = userInfo.Generation = 0;
-
ProcessUserAct(act, ctx);
}
-
- ReportCounters(ctx);
}
void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
@@ -4124,10 +4110,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
}
- if (CurrentStateFunc() != &TThis::StateInit) {
- InitUserInfoForImportantClients(ctx);
- }
-
if (Config.GetPartitionConfig().HasMirrorFrom()) {
if (Mirrorer) {
ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
@@ -4145,7 +4127,6 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
if (SendChangeConfigReply) {
SchedulePartitionConfigChanged();
}
- ReportCounters(ctx);
}
TString TPartition::GetKeyConfig() const
@@ -4168,41 +4149,6 @@ void TPartition::ResendPendingEvents(const TActorContext& ctx)
}
}
-void TPartition::InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config,
- const TActorContext& ctx) {
- TSet<TString> important;
-
- for (const auto& consumer : config.GetPartitionConfig().GetImportantClientId()) {
- important.insert(consumer);
-
- TUserInfo* userInfo = GetPendingUserIfExists(consumer);
-
- if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
- ScheduleDropPartitionLabeledCounters(userInfo->LabeledCounters->GetGroup());
- PendingSetImportant[consumer] = true;
- continue;
- }
-
- if (!userInfo) {
- userInfo = &GetOrCreatePendingUser(consumer, ctx, 0);
- PendingSetImportant[consumer] = true;
- }
-
- if (userInfo->Offset < (i64)StartOffset) {
- userInfo->Offset = StartOffset;
- }
-
- //ReadTimestampForOffset(consumer, *userInfo, ctx);
- }
-
- for (auto& [consumer, userInfo] : UsersInfoStorage->GetAll()) {
- if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
- ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
- PendingSetImportant[consumer] = false;
- }
- }
-}
-
void TPartition::ProcessDistrTx(const TActorContext& ctx)
{
Y_VERIFY(!TxInProgress);
@@ -4254,6 +4200,8 @@ void TPartition::ProcessImmediateTxs(const TActorContext& ctx)
void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
const TActorContext& ctx)
{
+ Y_UNUSED(ctx);
+
Y_VERIFY(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_VERIFY(tx.HasData());
@@ -4271,7 +4219,7 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
return;
}
- TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx);
+ TUserInfoBase& userInfo = GetOrCreatePendingUser(user);
if (operation.GetBegin() > operation.GetEnd()) {
ScheduleReplyPropose(tx,
@@ -4332,7 +4280,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
}
}
- TUserInfo& userInfo = GetOrCreatePendingUser(user, ctx);
+ TUserInfoBase& userInfo = GetOrCreatePendingUser(user);
if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE) {
LOG_DEBUG_S(
@@ -4347,7 +4295,8 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
}
if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_CREATE_SESSION && act.SessionId == userInfo.Session) { //this is retry of current request, answer ok
- auto ts = GetTime(userInfo, userInfo.Offset);
+ auto *ui = UsersInfoStorage->GetIfExists(userInfo.User);
+ auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero());
ScheduleReplyGetClientOffsetOk(act.Cookie,
userInfo.Offset,
@@ -4432,7 +4381,7 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
}
void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
- TUserInfo& userInfo,
+ TUserInfoBase& userInfo,
const TActorContext& ctx)
{
const TString& user = act.ClientId;
@@ -4457,7 +4406,6 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
"Topic '" << TopicConverter->GetClientsideName() << "' partition " << Partition << " user " << user
<< " drop done"
);
-
PendingUsersInfo.erase(user);
} else if (act.Type == TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE) {
LOG_DEBUG_S(
@@ -4470,7 +4418,6 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
userInfo.Session = "";
userInfo.Generation = userInfo.Step = 0;
userInfo.Offset = 0;
- userInfo.ReadScheduled = false;
if (userInfo.Important) {
userInfo.Offset = StartOffset;
@@ -4478,7 +4425,8 @@ void TPartition::EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
} else {
if (setSession || dropSession) {
offset = userInfo.Offset;
- auto ts = GetTime(userInfo, offset);
+ auto *ui = UsersInfoStorage->GetIfExists(userInfo.User);
+ auto ts = ui ? GetTime(*ui, userInfo.Offset) : std::make_pair<TInstant, TInstant>(TInstant::Zero(), TInstant::Zero());
ScheduleReplyGetClientOffsetOk(act.Cookie,
offset,
@@ -4644,11 +4592,12 @@ void TPartition::AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request)
TKeyPrefix ikeyDeprecated(TKeyPrefix::TypeInfo, Partition, TKeyPrefix::MarkUserDeprecated);
ikeyDeprecated.Append(user.c_str(), user.size());
- if (TUserInfo* userInfo = GetPendingUserIfExists(user)) {
+ if (TUserInfoBase* userInfo = GetPendingUserIfExists(user)) {
+ auto *ui = UsersInfoStorage->GetIfExists(user);
AddCmdWrite(request,
ikey, ikeyDeprecated,
userInfo->Offset, userInfo->Generation, userInfo->Step, userInfo->Session,
- userInfo->ReadOffsetRewindSum,
+ ui ? ui->ReadOffsetRewindSum : 0,
userInfo->ReadRuleGeneration);
} else {
AddCmdDeleteRange(request,
@@ -4674,25 +4623,25 @@ void TPartition::AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request)
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
}
-TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user,
- const TActorContext& ctx,
- TMaybe<ui64> readRuleGeneration)
+TUserInfoBase& TPartition::GetOrCreatePendingUser(const TString& user,
+ TMaybe<ui64> readRuleGeneration)
{
- TUserInfo* userInfo = UsersInfoStorage->GetIfExists(user);
+ TUserInfoBase* userInfo = nullptr;
auto i = PendingUsersInfo.find(user);
if (i == PendingUsersInfo.end()) {
+ auto ui = UsersInfoStorage->GetIfExists(user);
auto [p, _] = PendingUsersInfo.emplace(user, UsersInfoStorage->CreateUserInfo(user,
- ctx,
readRuleGeneration));
- if (userInfo) {
- p->second.Session = userInfo->Session;
- p->second.Generation = userInfo->Generation;
- p->second.Step = userInfo->Step;
- p->second.Offset = userInfo->Offset;
- p->second.ReadRuleGeneration = userInfo->ReadRuleGeneration;
- p->second.ReadScheduled = userInfo->ReadScheduled;
+ if (ui) {
+ p->second.Session = ui->Session;
+ p->second.Generation = ui->Generation;
+ p->second.Step = ui->Step;
+ p->second.Offset = ui->Offset;
+ p->second.ReadRuleGeneration = ui->ReadRuleGeneration;
+ p->second.Important = ui->Important;
+ p->second.ReadFromTimestamp = ui->ReadFromTimestamp;
}
userInfo = &p->second;
@@ -4705,7 +4654,7 @@ TUserInfo& TPartition::GetOrCreatePendingUser(const TString& user,
return *userInfo;
}
-TUserInfo* TPartition::GetPendingUserIfExists(const TString& user)
+TUserInfoBase* TPartition::GetPendingUserIfExists(const TString& user)
{
if (auto i = PendingUsersInfo.find(user); i != PendingUsersInfo.end()) {
return &i->second;
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 05ec9e0b556..2691e073765 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -241,7 +241,7 @@ private:
void ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
const TActorContext& ctx);
void EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
- TUserInfo& userInfo,
+ TUserInfoBase& userInfo,
const TActorContext& ctx);
void ScheduleReplyOk(const ui64 dst);
@@ -269,8 +269,8 @@ private:
void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated);
- TUserInfo& GetOrCreatePendingUser(const TString& user, const TActorContext& ctx, TMaybe<ui64> readRuleGeneration = {});
- TUserInfo* GetPendingUserIfExists(const TString& user);
+ TUserInfoBase& GetOrCreatePendingUser(const TString& user, TMaybe<ui64> readRuleGeneration = {});
+ TUserInfoBase* GetPendingUserIfExists(const TString& user);
THolder<TEvPQ::TEvProxyResponse> MakeReplyOk(const ui64 dst);
THolder<TEvPQ::TEvProxyResponse> MakeReplyGetClientOffsetOk(const ui64 dst,
@@ -585,10 +585,7 @@ private:
std::deque<TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>> ImmediateTxs;
std::deque<TTransaction> DistrTxs;
THashMap<TString, size_t> UserActCount;
- THashMap<TString, TUserInfo> PendingUsersInfo;
- THashMap<TString, TInstant> PendingReadFromTimestamp;
- THashMap<TString, bool> PendingSetImportant;
- THashSet<TString> PendingHasReadRule;
+ THashMap<TString, TUserInfoBase> PendingUsersInfo;
TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
THashSet<TString> AffectedUsers;
bool UsersInfoWriteInProgress = false;
diff --git a/ydb/core/persqueue/user_info.cpp b/ydb/core/persqueue/user_info.cpp
index 89e63abb720..b64c92609f7 100644
--- a/ydb/core/persqueue/user_info.cpp
+++ b/ydb/core/persqueue/user_info.cpp
@@ -192,24 +192,18 @@ TUserInfo TUsersInfoStorage::CreateUserInfo(const TActorContext& ctx,
bool meterRead = userServiceType.empty() || userServiceType == defaultServiceType;
- return {
+ return {
ctx, StreamCountersSubgroup, CreateReadSpeedLimiter(user), user, readRuleGeneration, important, TopicConverter, Partition,
session, gen, step, offset, readOffsetRewindSum, DCId, readFromTimestamp, DbPath,
meterRead, burst, speed
};
}
-TUserInfo TUsersInfoStorage::CreateUserInfo(const TString& user,
- const TActorContext& ctx,
+TUserInfoBase TUsersInfoStorage::CreateUserInfo(const TString& user,
TMaybe<ui64> readRuleGeneration) const
{
- return CreateUserInfo(ctx,
- user,
- readRuleGeneration ? *readRuleGeneration : ++CurReadRuleGeneration,
- false,
- "",
- 0, 0, 0, 0,
- TInstant::Zero());
+ return TUserInfoBase{user, readRuleGeneration ? *readRuleGeneration : ++CurReadRuleGeneration,
+ "", 0, 0, 0, false, {}};
}
TUserInfo& TUsersInfoStorage::Create(
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index 034edacdc21..8e5faf9d0e8 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -47,13 +47,22 @@ struct TReadSpeedLimiterHolder {
TTabletCountersBase Baseline;
};
-struct TUserInfo {
- THolder<TReadSpeedLimiterHolder> ReadSpeedLimiter;
+struct TUserInfoBase {
+ TString User;
+ ui64 ReadRuleGeneration = 0;
TString Session = "";
ui32 Generation = 0;
ui32 Step = 0;
i64 Offset = 0;
+
+ bool Important = false;
+ TInstant ReadFromTimestamp;
+};
+
+struct TUserInfo: public TUserInfoBase {
+ THolder<TReadSpeedLimiterHolder> ReadSpeedLimiter;
+
TInstant WriteTimestamp;
TInstant CreateTimestamp;
TInstant ReadTimestamp;
@@ -70,12 +79,8 @@ struct TUserInfo {
//When client will commit to new position, timestamps for this offset could be in cache - not insane client should read data before commit
std::deque<std::pair<ui64, std::pair<TInstant, TInstant>>> Cache;
- bool Important = false;
- TInstant ReadFromTimestamp;
bool HasReadRule = false;
THolder<TUserLabeledCounters> LabeledCounters;
- TString User;
- ui64 ReadRuleGeneration = 0;
NPersQueue::TTopicConverterPtr TopicConverter;
std::deque<std::pair<TReadInfo, ui64>> ReadRequests;
@@ -169,11 +174,8 @@ struct TUserInfo {
const TString& dbPath, bool meterRead,
ui64 burst = 1'000'000'000, ui64 speed = 1'000'000'000
)
- : ReadSpeedLimiter(std::move(readSpeedLimiter))
- , Session(session)
- , Generation(gen)
- , Step(step)
- , Offset(offset)
+ : TUserInfoBase{user, readRuleGeneration, session, gen, step, offset, important, readFromTimestamp}
+ , ReadSpeedLimiter(std::move(readSpeedLimiter))
, WriteTimestamp(TAppData::TimeProvider->Now())
, CreateTimestamp(TAppData::TimeProvider->Now())
, ReadTimestamp(TAppData::TimeProvider->Now())
@@ -183,11 +185,7 @@ struct TUserInfo {
, ReadCreateTimestamp(TAppData::TimeProvider->Now())
, ReadOffsetRewindSum(readOffsetRewindSum)
, ReadScheduled(false)
- , Important(important)
- , ReadFromTimestamp(readFromTimestamp)
, HasReadRule(false)
- , User(user)
- , ReadRuleGeneration(readRuleGeneration)
, TopicConverter(topicConverter)
, ReadQuota(burst, speed, TAppData::TimeProvider->Now())
, Counter(nullptr)
@@ -397,8 +395,7 @@ public:
THashMap<TString, TUserInfo>& GetAll();
- TUserInfo CreateUserInfo(const TString& user,
- const TActorContext& ctx,
+ TUserInfoBase CreateUserInfo(const TString& user,
TMaybe<ui64> readRuleGeneration = {}) const;
TUserInfo& Create(
const TActorContext& ctx, const TString& user, const ui64 readRuleGeneration, bool important, const TString &session,
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index 04aae14c11c..1d5973fb6b1 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -288,12 +288,15 @@ void CheckLabeledCountersResponse(TTestContext& tc, ui32 count, TVector<TString>
THashSet<TString> groups;
+ Cerr << "NEW ANS:\n";
for (ui32 i = 0; i < result->Record.LabeledCountersByGroupSize(); ++i) {
auto& c = result->Record.GetLabeledCountersByGroup(i);
groups.insert(c.GetGroup());
+ Cerr << "ANS GROUP " << c.GetGroup() << "\n";
}
UNIT_ASSERT_VALUES_EQUAL(groups.size(), count);
for (auto& g : mustHave) {
+ Cerr << "CHECKING GROUP " << g << "\n";
UNIT_ASSERT(groups.contains(g));
}
}
@@ -365,29 +368,21 @@ Y_UNIT_TEST(ImportantFlagSwitching) {
CheckLabeledCountersResponse(tc, 11, MakeTopics({"user/1", "user2/1"}));
PQTabletPrepare({}, {{"user", true}, {"user2", false}}, tc);
- {
- TDispatchOptions options;
- options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
- tc.Runtime->DispatchEvents(options);
- }
- {
+ for (ui32 i = 0 ; i < 2; ++i){
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
tc.Runtime->DispatchEvents(options);
}
+
CheckLabeledCountersResponse(tc, 12, MakeTopics({"user/1", "user2/0"}));
PQTabletPrepare({}, {{"user", true}}, tc);
- {
- TDispatchOptions options;
- options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
- tc.Runtime->DispatchEvents(options);
- }
- {
+ for (ui32 i = 0 ; i < 2; ++i){
TDispatchOptions options;
options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
tc.Runtime->DispatchEvents(options);
}
+
CheckLabeledCountersResponse(tc, 8, MakeTopics({"user/1"}));
});
}