diff options
author | abcdef <akotov@ydb.tech> | 2023-01-10 12:58:30 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-01-10 12:58:30 +0300 |
commit | 67791b7abe363b696b68b4529a2a0d7268363dd0 (patch) | |
tree | 6344ce1e04ee34e833ed3ecef9cae40c865fb6fd | |
parent | c28586c3736da502ce58e08c5410768106e81ef4 (diff) | |
download | ydb-67791b7abe363b696b68b4529a2a0d7268363dd0.tar.gz |
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 10 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 381 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 30 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 49 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/user_action_processor_ut.cpp | 143 |
7 files changed, 488 insertions, 131 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 451b3d00e5..bd0b5c7894 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -123,6 +123,7 @@ struct TEvPQ { EvTxCommit, EvTxCommitDone, EvTxRollback, + EvPartitionConfigChanged, EvEnd }; @@ -464,6 +465,15 @@ struct TEvPQ { NKikimrPQ::TPQTabletConfig Config; }; + struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> { + explicit TEvPartitionConfigChanged(ui32 partition) : + Partition(partition) + { + } + + ui32 Partition; + }; + struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> { explicit TEvChangeCacheConfig(ui32 maxSize) : MaxSize(maxSize) diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 4ccef73eda..4d71e2c644 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1908,44 +1908,9 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) { void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) { - Config = ev->Get()->Config; - TopicConverter = ev->Get()->TopicConverter; - - Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0); - - UsersInfoStorage.UpdateConfig(ev->Get()->Config); - - WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()); - if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { - for (auto& userInfo : UsersInfoStorage.GetAll()) { - userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2); - } - } - - for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { - auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx); - userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); - } - - if (CurrentStateFunc() != &TThis::StateInit) { - InitUserInfoForImportantClients(ctx); - FillReadFromTimestamps(Config, ctx); - - ProcessTxsAndUserActs(ctx); - } + AddDistrTx(ev->Release()); - if (Config.GetPartitionConfig().HasMirrorFrom()) { - if (Mirrorer) { - ctx.Send(ev->Forward(Mirrorer->Actor)); - } else { - CreateMirrorerActor(); - } - } else { - if (Mirrorer) { - ctx.Send(Mirrorer->Actor, new TEvents::TEvPoisonPill()); - Mirrorer.Reset(); - } - } + ProcessTxsAndUserActs(ctx); } @@ -2323,37 +2288,8 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) { - if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) { - ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release()); - return; - } - } - - Y_VERIFY(TxInProgress); - - Y_VERIFY(!DistrTxs.empty()); - TTransaction& t = DistrTxs.front(); - - Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx)); - Y_VERIFY(t.Predicate.Defined() && *t.Predicate); - - for (auto& operation : t.Tx->Operations) { - TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx); - - Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin()); - - userInfo.Offset = operation.GetEnd(); - userInfo.Session = ""; - } - - PlanStep = t.Tx->Step; - TxId = t.Tx->TxId; - TxIdHasChanged = true; + EndTransaction(*ev->Get(), ctx); - ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId); - - RemoveDistrTx(); TxInProgress = false; ContinueProcessTxsAndUserActs(ctx); @@ -2361,25 +2297,8 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx) void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx) { - if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) { - return; - } - } - - Y_VERIFY(TxInProgress); - - Y_VERIFY(!DistrTxs.empty()); - TTransaction& t = DistrTxs.front(); + EndTransaction(*ev->Get(), ctx); - Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx)); - Y_VERIFY(t.Predicate.Defined()); - - PlanStep = t.Tx->Step; - TxId = t.Tx->TxId; - TxIdHasChanged = true; - - RemoveDistrTx(); TxInProgress = false; ContinueProcessTxsAndUserActs(ctx); @@ -3689,6 +3608,11 @@ void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContex void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) { Y_VERIFY(cookie == SET_OFFSET_COOKIE); + if (ChangeConfig) { + EndChangePartitionConfig(*ChangeConfig, ctx); + ChangeConfig = nullptr; + } + for (auto& user : AffectedUsers) { if (auto* actual = GetPendingUserIfExists(user)) { TUserInfo& userInfo = UsersInfoStorage.GetOrCreate(user, ctx); @@ -3700,6 +3624,14 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) userInfo.Offset = actual->Offset; userInfo.ReadRuleGeneration = actual->ReadRuleGeneration; + if (PendingHasReadRule.contains(user)) { + userInfo.HasReadRule = true; + } + + if (auto p = PendingReadFromTimestamp.find(user); p != PendingReadFromTimestamp.end()) { + userInfo.ReadFromTimestamp = p->second; + } + if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) { userInfo.ActualTimestamps = false; ReadTimestampForOffset(user, userInfo, ctx); @@ -3711,6 +3643,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) } } + for (auto& [consumer, important] : PendingSetImportant) { + if (auto* userInfo = UsersInfoStorage.GetIfExists(consumer); userInfo) { + userInfo->SetImportant(important); + } + } + for (auto& [actor, reply] : Replies) { ctx.Send(actor, reply.release()); } @@ -3718,6 +3656,9 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) PendingUsersInfo.clear(); Replies.clear(); AffectedUsers.clear(); + PendingReadFromTimestamp.clear(); + PendingSetImportant.clear(); + PendingHasReadRule.clear(); UsersInfoWriteInProgress = false; @@ -3731,6 +3672,11 @@ void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event) DistrTxs.emplace_back(std::move(event)); } +void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event) +{ + DistrTxs.emplace_back(std::move(event)); +} + void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx) { ImmediateTxs.push_back(std::move(tx)); @@ -3828,18 +3774,19 @@ void TPartition::ProcessDistrTxs(const TActorContext& ctx) } } -void TPartition::ProcessDistrTx(const TActorContext& ctx) +bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, + const TActorContext& ctx) { - Y_VERIFY(!TxInProgress); - - Y_VERIFY(!DistrTxs.empty()); - TTransaction& t = DistrTxs.front(); - bool predicate = true; - for (auto& operation : t.Tx->Operations) { + for (auto& operation : tx.Operations) { const TString& consumer = operation.GetConsumer(); + if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) { + predicate = false; + break; + } + if (!UsersInfoStorage.GetIfExists(consumer)) { predicate = false; break; @@ -3867,15 +3814,238 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx) } } - t.Predicate = predicate; + return predicate; +} - auto response = MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step, - t.Tx->TxId, - Partition, - predicate); - ctx.Send(Tablet, response.Release()); +void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event, + const TActorContext& ctx) +{ + if (PlanStep.Defined() && TxId.Defined()) { + if (GetStepAndTxId(event) <= GetStepAndTxId(*PlanStep, *TxId)) { + ctx.Send(Tablet, MakeCommitDone(event.Step, event.TxId).Release()); + return; + } + } + + Y_VERIFY(TxInProgress); + + Y_VERIFY(!DistrTxs.empty()); + TTransaction& t = DistrTxs.front(); + + Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); + Y_VERIFY(t.Predicate.Defined() && *t.Predicate); + + for (auto& operation : t.Tx->Operations) { + TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx); + + Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin()); + + userInfo.Offset = operation.GetEnd(); + userInfo.Session = ""; + } + + PlanStep = t.Tx->Step; + TxId = t.Tx->TxId; + TxIdHasChanged = true; + + ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId); + + RemoveDistrTx(); +} + +void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event, + const TActorContext& ctx) +{ + Y_UNUSED(ctx); + + if (PlanStep.Defined() && TxId.Defined()) { + if (GetStepAndTxId(event) <= GetStepAndTxId(*PlanStep, *TxId)) { + return; + } + } + + Y_VERIFY(TxInProgress); + + Y_VERIFY(!DistrTxs.empty()); + TTransaction& t = DistrTxs.front(); - TxInProgress = true; + Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); + Y_VERIFY(t.Predicate.Defined()); + + PlanStep = t.Tx->Step; + TxId = t.Tx->TxId; + TxIdHasChanged = true; + + RemoveDistrTx(); +} + +void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, + const TActorContext& ctx) +{ + InitPendingUserInfoForImportantClients(event, ctx); + + TSet<TString> hasReadRule; + + for (auto& [consumer, info] : UsersInfoStorage.GetAll()) { + PendingReadFromTimestamp[consumer] = TInstant::Zero(); + + hasReadRule.insert(consumer); + } + + const NKikimrPQ::TPQTabletConfig& config = event.Config; + + for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { + const auto& consumer = config.GetReadRules(i); + auto& userInfo = GetOrCreatePendingUser(consumer, ctx, 0); + + ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0; + if (userInfo.ReadRuleGeneration != rrGen) { + TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0, + TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen); + + userInfo.Session = ""; + userInfo.Offset = 0; + if (userInfo.Important) { + userInfo.Offset = StartOffset; + } + userInfo.Step = userInfo.Generation = 0; + + ProcessUserAct(act, ctx); + } + + userInfo.HasReadRule = true; + hasReadRule.erase(consumer); + + TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); + if (!ts) { + ts += TDuration::MilliSeconds(1); + } + if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) { + PendingReadFromTimestamp[consumer] = ts; + } + + PendingHasReadRule.insert(consumer); + } + + for (auto& consumer : hasReadRule) { + auto& userInfo = GetOrCreatePendingUser(consumer, ctx); + TEvPQ::TEvSetClientInfo act(0, consumer, + 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); + + ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); + + userInfo.Session = ""; + userInfo.Offset = 0; + userInfo.Step = userInfo.Generation = 0; + + ProcessUserAct(act, ctx); + } +} + +void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, + const TActorContext& ctx) +{ + Config = event.Config; + TopicConverter = event.TopicConverter; + + Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0); + + UsersInfoStorage.UpdateConfig(event.Config); + + WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()); + if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { + for (auto& userInfo : UsersInfoStorage.GetAll()) { + userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2); + } + } + + for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) { + auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx); + userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond()); + } + + if (CurrentStateFunc() != &TThis::StateInit) { + InitUserInfoForImportantClients(ctx); + } + + if (Config.GetPartitionConfig().HasMirrorFrom()) { + if (Mirrorer) { + ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(event.TopicConverter, + event.Config)); + } else { + CreateMirrorerActor(); + } + } else { + if (Mirrorer) { + ctx.Send(Mirrorer->Actor, new TEvents::TEvPoisonPill()); + Mirrorer.Reset(); + } + } + + SchedulePartitionConfigChanged(); +} + +void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, + const TActorContext& ctx) { + const NKikimrPQ::TPQTabletConfig& config = event.Config; + TSet<TString> important; + + for (const auto& consumer : config.GetPartitionConfig().GetImportantClientId()) { + important.insert(consumer); + + TUserInfo* userInfo = GetPendingUserIfExists(consumer); + + if (userInfo && !userInfo->Important && userInfo->LabeledCounters) { + ScheduleDropPartitionLabeledCounters(userInfo->LabeledCounters->GetGroup()); + PendingSetImportant[consumer] = true; + continue; + } + + if (!userInfo) { + userInfo = &GetOrCreatePendingUser(consumer, ctx, 0); + PendingSetImportant[consumer] = true; + } + + if (userInfo->Offset < (i64)StartOffset) { + userInfo->Offset = StartOffset; + } + + //ReadTimestampForOffset(consumer, *userInfo, ctx); + } + + for (auto& [consumer, userInfo] : UsersInfoStorage.GetAll()) { + if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) { + ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup()); + PendingSetImportant[consumer] = false; + } + } +} + +void TPartition::ProcessDistrTx(const TActorContext& ctx) +{ + Y_VERIFY(!TxInProgress); + + Y_VERIFY(!DistrTxs.empty()); + TTransaction& t = DistrTxs.front(); + + if (t.Tx) { + t.Predicate = BeginTransaction(*t.Tx, ctx); + + ctx.Send(Tablet, + MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step, + t.Tx->TxId, + Partition, + *t.Predicate).Release()); + + TxInProgress = true; + } else { + Y_VERIFY(!ChangeConfig); + + ChangeConfig = t.ChangeConfig; + BeginChangePartitionConfig(*ChangeConfig, ctx); + + RemoveDistrTx(); + } } void TPartition::ProcessImmediateTxs(const TActorContext& ctx) @@ -3949,7 +4119,6 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act, const TActorContext& ctx) { Y_VERIFY(!UsersInfoWriteInProgress); - Y_VERIFY(!UserActs.empty()); const TString& user = act.ClientId; @@ -4169,6 +4338,18 @@ void TPartition::ScheduleReplyCommitDone(ui64 step, ui64 txId) MakeCommitDone(step, txId).Release()); } +void TPartition::ScheduleDropPartitionLabeledCounters(const TString& group) +{ + Replies.emplace_back(Tablet, + MakeHolder<TEvPQ::TEvPartitionLabeledCountersDrop>(Partition, group).Release()); +} + +void TPartition::SchedulePartitionConfigChanged() +{ + Replies.emplace_back(Tablet, + MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release()); +} + void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated) { diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 1c0ed6d6b8..815d83f8ee 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -45,8 +45,16 @@ struct TTransaction { Y_VERIFY(Tx); } + explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig) : + ChangeConfig(changeConfig) + { + Y_VERIFY(ChangeConfig); + } + TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx; TMaybe<bool> Predicate; + + TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig; }; class TPartition : public TActorBootstrapped<TPartition> { @@ -195,6 +203,7 @@ private: void ContinueProcessTxsAndUserActs(const TActorContext& ctx); void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event); + void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event); void RemoveDistrTx(); void ProcessDistrTxs(const TActorContext& ctx); void ProcessDistrTx(const TActorContext& ctx); @@ -226,6 +235,8 @@ private: void ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode); void ScheduleReplyCommitDone(ui64 step, ui64 txId); + void ScheduleDropPartitionLabeledCounters(const TString& group); + void SchedulePartitionConfigChanged(); void AddCmdWrite(NKikimrClient::TKeyValueRequest& request, const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated, @@ -252,6 +263,21 @@ private: NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode); THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId); + bool BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, + const TActorContext& ctx); + void EndTransaction(const TEvPQ::TEvTxCommit& event, + const TActorContext& ctx); + void EndTransaction(const TEvPQ::TEvTxRollback& event, + const TActorContext& ctx); + + void BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, + const TActorContext& ctx); + void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, + const TActorContext& ctx); + + void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, + const TActorContext& ctx); + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; @@ -490,6 +516,9 @@ private: std::deque<TTransaction> DistrTxs; THashMap<TString, size_t> UserActCount; THashMap<TString, TUserInfo> PendingUsersInfo; + THashMap<TString, TInstant> PendingReadFromTimestamp; + THashMap<TString, bool> PendingSetImportant; + THashSet<TString> PendingHasReadRule; TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies; THashSet<TString> AffectedUsers; bool UsersInfoWriteInProgress = false; @@ -497,6 +526,7 @@ private: TMaybe<ui64> PlanStep; TMaybe<ui64> TxId; bool TxIdHasChanged = false; + TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig; // // // diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index a5f1c35a79..38682b6379 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -653,6 +653,8 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) for (auto& p : Partitions) { //change config for already created partitions ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); } + ChangePartitionConfigInflight += Partitions.size(); + for (const auto& partition : Config.GetPartitions()) { const auto partitionId = partition.GetPartitionId(); if (Partitions.find(partitionId) == Partitions.end()) { @@ -669,21 +671,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) ++PartitionsInited; //newly created partition is empty and ready to work } } - for (auto& p : ChangeConfigNotification) { - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() - << " Config applied version " << Config.GetVersion() << " actor " << p.Actor - << " txId " << p.TxId << " config:\n" << Config.DebugString()); - THolder<TEvPersQueue::TEvUpdateConfigResponse> res{new TEvPersQueue::TEvUpdateConfigResponse}; - res->Record.SetStatus(NKikimrPQ::OK); - res->Record.SetTxId(p.TxId); - res->Record.SetOrigin(TabletID()); - ctx.Send(p.Actor, res.Release()); + if (!ChangePartitionConfigInflight) { + OnAllPartitionConfigChanged(ctx); } - ChangeConfigNotification.clear(); - NewConfigShouldBeApplied = false; - NewConfig.Clear(); - } void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) @@ -1140,6 +1131,37 @@ void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorCon } +void TPersQueue::Handle(TEvPQ::TEvPartitionConfigChanged::TPtr&, const TActorContext& ctx) +{ + Y_VERIFY(ChangePartitionConfigInflight > 0); + --ChangePartitionConfigInflight; + + if (ChangePartitionConfigInflight) { + return; + } + + OnAllPartitionConfigChanged(ctx); +} + +void TPersQueue::OnAllPartitionConfigChanged(const TActorContext& ctx) +{ + for (auto& p : ChangeConfigNotification) { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() + << " Config applied version " << Config.GetVersion() << " actor " << p.Actor + << " txId " << p.TxId << " config:\n" << Config.DebugString()); + + THolder<TEvPersQueue::TEvUpdateConfigResponse> res{new TEvPersQueue::TEvUpdateConfigResponse}; + res->Record.SetStatus(NKikimrPQ::OK); + res->Record.SetTxId(p.TxId); + res->Record.SetOrigin(TabletID()); + ctx.Send(p.Actor, res.Release()); + } + + ChangeConfigNotification.clear(); + NewConfigShouldBeApplied = false; + NewConfig.Clear(); +} + void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx) { auto& record = ev->Record; @@ -2265,6 +2287,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvPQ::TEvProxyResponse, Handle); CFunc(TEvents::TSystem::Wakeup, HandleWakeup); HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle); + HFuncTraced(TEvPQ::TEvPartitionConfigChanged, Handle); default: return false; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index c3fdd43337..407474fca6 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -65,6 +65,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { //client requests void Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvPartitionConfigChanged::TPtr& ev, const TActorContext& ctx); void ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx); void Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx); @@ -96,6 +97,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { const NKikimrClient::TPersQueuePartitionRequest::TCmdDeregisterMessageGroup& cmd, NPersQueue::NErrorCode::EErrorCode& code, TString& error) const; + void OnAllPartitionConfigChanged(const TActorContext& ctx); + //client request void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx); #define DESCRIBE_HANDLE(A) void A(const ui64 responseCookie, const TActorId& partActor, \ @@ -143,6 +146,7 @@ private: TSet<TChangeNotification> ChangeConfigNotification; NKikimrPQ::TPQTabletConfig NewConfig; bool NewConfigShouldBeApplied; + size_t ChangePartitionConfigInflight = 0; TString TopicName; TString TopicPath; diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index 8c1bce2c57..6f10cd85a7 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -314,7 +314,7 @@ Y_UNIT_TEST(ImportantFlagSwitching) { options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters); tc.Runtime->DispatchEvents(options); } - CheckLabeledCountersResponse(tc, 8, {NKikimr::JoinPath({"user/1", topicName})}); + CheckLabeledCountersResponse(tc, 8, MakeTopics({"user/1"})); PQTabletPrepare({}, {}, tc); { diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp index 275d163724..25f22548a2 100644 --- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp +++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp @@ -49,10 +49,16 @@ struct TCreateConsumerParams { class TUserActionProcessorFixture : public NUnitTest::TBaseFixture { protected: struct TUserInfoMatcher { + TMaybe<TString> Consumer; TMaybe<TString> Session; TMaybe<ui64> Offset; TMaybe<ui32> Generation; TMaybe<ui32> Step; + TMaybe<ui64> ReadRuleGeneration; + }; + + struct TDeleteRangeMatcher { + TMaybe<TString> Consumer; }; struct TCmdWriteMatcher { @@ -60,6 +66,7 @@ protected: TMaybe<ui64> PlanStep; TMaybe<ui64> TxId; THashMap<size_t, TUserInfoMatcher> UserInfos; + THashMap<size_t, TDeleteRangeMatcher> DeleteRanges; }; struct TProxyResponseMatcher { @@ -93,6 +100,10 @@ protected: TMaybe<ui32> Partition; }; + struct TChangePartitionConfigMatcher { + TMaybe<ui32> Partition; + }; + using TCreatePartitionParams = NHelpers::TCreatePartitionParams; using TCreateConsumerParams = NHelpers::TCreateConsumerParams; @@ -163,6 +174,9 @@ protected: void SendRollbackTx(ui64 step, ui64 txId); void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {}); + void SendChangePartitionConfig(const TVector<TCreateConsumerParams>& consumers = {}); + void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {}); + TTransaction MakeTransaction(ui64 step, ui64 txId, TString consumer, ui64 begin, ui64 end, @@ -173,6 +187,9 @@ private: TMaybe<TFinalizer> Finalizer; TActorId ActorId; + + NPersQueue::TTopicConverterPtr TopicConverter; + NKikimrPQ::TPQTabletConfig Config; }; void TUserActionProcessorFixture::SetUp(NUnitTest::TTestContext&) @@ -212,31 +229,29 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id, TAutoPtr<TCounters> counters(new TCounters()); TAutoPtr<TTabletCountersBase> tabletCounters = counters->GetSecondTabletCounters().Release(); - NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1"); - NPersQueue::TTopicConverterPtr topicConverter; - NKikimrPQ::TPQTabletConfig config; - for (auto& c : consumers) { - config.AddReadRules(c.Consumer); + Config.AddReadRules(c.Consumer); } - config.SetTopicName("rt3.dc1--account--topic"); - config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); - config.SetFederationAccount("account"); - config.SetLocalDC(true); - config.SetYdbDatabasePath(""); + Config.SetTopicName("rt3.dc1--account--topic"); + Config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); + Config.SetFederationAccount("account"); + Config.SetLocalDC(true); + Config.SetYdbDatabasePath(""); + + NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1"); - topicConverter = factory.MakeTopicConverter(config); + TopicConverter = factory.MakeTopicConverter(Config); auto actor = new NPQ::TPartition(Ctx->TabletId, id, Ctx->Edge, Ctx->Edge, - topicConverter, + TopicConverter, true, "dcId", false, - config, + Config, *tabletCounters, newPartition, std::move(txs)); @@ -334,8 +349,13 @@ void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1); // SET_OFFSET_COOKIE if (matcher.Count.Defined()) { - UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize()); + UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, + event->Record.CmdWriteSize() + event->Record.CmdDeleteRangeSize()); } + + // + // TxMeta + // if (matcher.PlanStep.Defined()) { NKikimrPQ::TPartitionTxMeta meta; UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue())); @@ -348,10 +368,12 @@ void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId()); } + + // + // CmdWrite + // for (auto& [index, userInfo] : matcher.UserInfos) { - if (matcher.Count.Defined()) { - UNIT_ASSERT(index < *matcher.Count); - } + UNIT_ASSERT(index < event->Record.CmdWriteSize()); NKikimrPQ::TUserInfo ud; UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue())); @@ -372,6 +394,19 @@ void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher) UNIT_ASSERT(ud.HasOffset()); UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset()); } + if (userInfo.ReadRuleGeneration) { + UNIT_ASSERT(ud.HasReadRuleGeneration()); + UNIT_ASSERT_VALUES_EQUAL(*userInfo.ReadRuleGeneration, ud.GetReadRuleGeneration()); + } + } + + // + // CmdDeleteRange + // + for (auto& [index, deleteRange] : matcher.DeleteRanges) { + UNIT_ASSERT(index < event->Record.CmdDeleteRangeSize()); + + Y_UNUSED(deleteRange); } } @@ -663,6 +698,31 @@ void TUserActionProcessorFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& m } } +void TUserActionProcessorFixture::SendChangePartitionConfig(const TVector<TCreateConsumerParams>& consumers) +{ + auto config = Config; + config.ClearReadRules(); + config.ClearReadRuleGenerations(); + + for (auto& c : consumers) { + config.AddReadRules(c.Consumer); + config.AddReadRuleGenerations(c.Generation); + } + + auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, config); + Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); +} + +void TUserActionProcessorFixture::WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher) +{ + auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvPartitionConfigChanged>(); + UNIT_ASSERT(event != nullptr); + + if (matcher.Partition) { + UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition); + } +} + TTransaction TUserActionProcessorFixture::MakeTransaction(ui64 step, ui64 txId, TString consumer, ui64 begin, ui64 end, @@ -1060,6 +1120,55 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TUserActionProcessorFixture) WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true}); } +Y_UNIT_TEST_F(ChangeConfig, TUserActionProcessorFixture) +{ + const ui32 partition = 3; + const ui64 begin = 0; + const ui64 end = 10; + const TString client = "client"; + const TString session = "session"; + + const ui64 step = 12345; + const ui64 txId_1 = 67890; + const ui64 txId_2 = 67891; + + CreatePartition({.Partition=partition, .Begin=begin, .End=end}, { + {.Consumer="client-1", .Offset=0, .Session="session-1"}, + {.Consumer="client-2", .Offset=0, .Session="session-2"}, + {.Consumer="client-3", .Offset=0, .Session="session-3"} + }); + + SendCalcPredicate(step, txId_1, "client-1", 0, 2); + SendChangePartitionConfig({{.Consumer="client-1", .Generation=0}, + { .Consumer="client-3", .Generation=7}}); + // + // consumer 'client-2' will be deleted + // + SendCalcPredicate(step, txId_2, "client-2", 0, 2); + + WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true}); + SendCommitTx(step, txId_1); + + // + // consumer 'client-2' was deleted + // + WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false}); + SendRollbackTx(step, txId_2); + + WaitCmdWrite({.Count=7, + .PlanStep=step, .TxId=txId_2, + .UserInfos={ + {1, {.Consumer="client-1", .Session="", .Offset=2}}, + {3, {.Consumer="client-3", .Session="", .Offset=0, .ReadRuleGeneration=7}} + }, + .DeleteRanges={ + {0, {.Consumer="client-2"}} + }}); + SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK); + + WaitPartitionConfigChanged({.Partition=partition}); +} + } // Y_UNIT_TEST_SUITE(TUserActionProcessorTests) } // namespace NKikimr::NPQ |