aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-01-10 12:58:30 +0300
committerabcdef <akotov@ydb.tech>2023-01-10 12:58:30 +0300
commit67791b7abe363b696b68b4529a2a0d7268363dd0 (patch)
tree6344ce1e04ee34e833ed3ecef9cae40c865fb6fd
parentc28586c3736da502ce58e08c5410768106e81ef4 (diff)
downloadydb-67791b7abe363b696b68b4529a2a0d7268363dd0.tar.gz
-rw-r--r--ydb/core/persqueue/events/internal.h10
-rw-r--r--ydb/core/persqueue/partition.cpp381
-rw-r--r--ydb/core/persqueue/partition.h30
-rw-r--r--ydb/core/persqueue/pq_impl.cpp49
-rw-r--r--ydb/core/persqueue/pq_impl.h4
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp2
-rw-r--r--ydb/core/persqueue/ut/user_action_processor_ut.cpp143
7 files changed, 488 insertions, 131 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 451b3d00e5..bd0b5c7894 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -123,6 +123,7 @@ struct TEvPQ {
EvTxCommit,
EvTxCommitDone,
EvTxRollback,
+ EvPartitionConfigChanged,
EvEnd
};
@@ -464,6 +465,15 @@ struct TEvPQ {
NKikimrPQ::TPQTabletConfig Config;
};
+ struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
+ explicit TEvPartitionConfigChanged(ui32 partition) :
+ Partition(partition)
+ {
+ }
+
+ ui32 Partition;
+ };
+
struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> {
explicit TEvChangeCacheConfig(ui32 maxSize)
: MaxSize(maxSize)
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 4ccef73eda..4d71e2c644 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1908,44 +1908,9 @@ void TPartition::InitUserInfoForImportantClients(const TActorContext& ctx) {
void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx) {
- Config = ev->Get()->Config;
- TopicConverter = ev->Get()->TopicConverter;
-
- Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0);
-
- UsersInfoStorage.UpdateConfig(ev->Get()->Config);
-
- WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
- if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
- for (auto& userInfo : UsersInfoStorage.GetAll()) {
- userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2);
- }
- }
-
- for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
- auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx);
- userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
- }
-
- if (CurrentStateFunc() != &TThis::StateInit) {
- InitUserInfoForImportantClients(ctx);
- FillReadFromTimestamps(Config, ctx);
-
- ProcessTxsAndUserActs(ctx);
- }
+ AddDistrTx(ev->Release());
- if (Config.GetPartitionConfig().HasMirrorFrom()) {
- if (Mirrorer) {
- ctx.Send(ev->Forward(Mirrorer->Actor));
- } else {
- CreateMirrorerActor();
- }
- } else {
- if (Mirrorer) {
- ctx.Send(Mirrorer->Actor, new TEvents::TEvPoisonPill());
- Mirrorer.Reset();
- }
- }
+ ProcessTxsAndUserActs(ctx);
}
@@ -2323,37 +2288,8 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
{
- if (PlanStep.Defined() && TxId.Defined()) {
- if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) {
- ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
- return;
- }
- }
-
- Y_VERIFY(TxInProgress);
-
- Y_VERIFY(!DistrTxs.empty());
- TTransaction& t = DistrTxs.front();
-
- Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx));
- Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
-
- for (auto& operation : t.Tx->Operations) {
- TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx);
-
- Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin());
-
- userInfo.Offset = operation.GetEnd();
- userInfo.Session = "";
- }
-
- PlanStep = t.Tx->Step;
- TxId = t.Tx->TxId;
- TxIdHasChanged = true;
+ EndTransaction(*ev->Get(), ctx);
- ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId);
-
- RemoveDistrTx();
TxInProgress = false;
ContinueProcessTxsAndUserActs(ctx);
@@ -2361,25 +2297,8 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
{
- if (PlanStep.Defined() && TxId.Defined()) {
- if (GetStepAndTxId(*ev->Get()) <= GetStepAndTxId(*PlanStep, *TxId)) {
- return;
- }
- }
-
- Y_VERIFY(TxInProgress);
-
- Y_VERIFY(!DistrTxs.empty());
- TTransaction& t = DistrTxs.front();
+ EndTransaction(*ev->Get(), ctx);
- Y_VERIFY(GetStepAndTxId(*ev->Get()) == GetStepAndTxId(*t.Tx));
- Y_VERIFY(t.Predicate.Defined());
-
- PlanStep = t.Tx->Step;
- TxId = t.Tx->TxId;
- TxIdHasChanged = true;
-
- RemoveDistrTx();
TxInProgress = false;
ContinueProcessTxsAndUserActs(ctx);
@@ -3689,6 +3608,11 @@ void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContex
void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) {
Y_VERIFY(cookie == SET_OFFSET_COOKIE);
+ if (ChangeConfig) {
+ EndChangePartitionConfig(*ChangeConfig, ctx);
+ ChangeConfig = nullptr;
+ }
+
for (auto& user : AffectedUsers) {
if (auto* actual = GetPendingUserIfExists(user)) {
TUserInfo& userInfo = UsersInfoStorage.GetOrCreate(user, ctx);
@@ -3700,6 +3624,14 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
userInfo.Offset = actual->Offset;
userInfo.ReadRuleGeneration = actual->ReadRuleGeneration;
+ if (PendingHasReadRule.contains(user)) {
+ userInfo.HasReadRule = true;
+ }
+
+ if (auto p = PendingReadFromTimestamp.find(user); p != PendingReadFromTimestamp.end()) {
+ userInfo.ReadFromTimestamp = p->second;
+ }
+
if (offsetHasChanged && !userInfo.UpdateTimestampFromCache()) {
userInfo.ActualTimestamps = false;
ReadTimestampForOffset(user, userInfo, ctx);
@@ -3711,6 +3643,12 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
}
}
+ for (auto& [consumer, important] : PendingSetImportant) {
+ if (auto* userInfo = UsersInfoStorage.GetIfExists(consumer); userInfo) {
+ userInfo->SetImportant(important);
+ }
+ }
+
for (auto& [actor, reply] : Replies) {
ctx.Send(actor, reply.release());
}
@@ -3718,6 +3656,9 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
PendingUsersInfo.clear();
Replies.clear();
AffectedUsers.clear();
+ PendingReadFromTimestamp.clear();
+ PendingSetImportant.clear();
+ PendingHasReadRule.clear();
UsersInfoWriteInProgress = false;
@@ -3731,6 +3672,11 @@ void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
DistrTxs.emplace_back(std::move(event));
}
+void TPartition::AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
+{
+ DistrTxs.emplace_back(std::move(event));
+}
+
void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
{
ImmediateTxs.push_back(std::move(tx));
@@ -3828,18 +3774,19 @@ void TPartition::ProcessDistrTxs(const TActorContext& ctx)
}
}
-void TPartition::ProcessDistrTx(const TActorContext& ctx)
+bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
+ const TActorContext& ctx)
{
- Y_VERIFY(!TxInProgress);
-
- Y_VERIFY(!DistrTxs.empty());
- TTransaction& t = DistrTxs.front();
-
bool predicate = true;
- for (auto& operation : t.Tx->Operations) {
+ for (auto& operation : tx.Operations) {
const TString& consumer = operation.GetConsumer();
+ if (AffectedUsers.contains(consumer) && !GetPendingUserIfExists(consumer)) {
+ predicate = false;
+ break;
+ }
+
if (!UsersInfoStorage.GetIfExists(consumer)) {
predicate = false;
break;
@@ -3867,15 +3814,238 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
}
}
- t.Predicate = predicate;
+ return predicate;
+}
- auto response = MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
- t.Tx->TxId,
- Partition,
- predicate);
- ctx.Send(Tablet, response.Release());
+void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,
+ const TActorContext& ctx)
+{
+ if (PlanStep.Defined() && TxId.Defined()) {
+ if (GetStepAndTxId(event) <= GetStepAndTxId(*PlanStep, *TxId)) {
+ ctx.Send(Tablet, MakeCommitDone(event.Step, event.TxId).Release());
+ return;
+ }
+ }
+
+ Y_VERIFY(TxInProgress);
+
+ Y_VERIFY(!DistrTxs.empty());
+ TTransaction& t = DistrTxs.front();
+
+ Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
+ Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
+
+ for (auto& operation : t.Tx->Operations) {
+ TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx);
+
+ Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin());
+
+ userInfo.Offset = operation.GetEnd();
+ userInfo.Session = "";
+ }
+
+ PlanStep = t.Tx->Step;
+ TxId = t.Tx->TxId;
+ TxIdHasChanged = true;
+
+ ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId);
+
+ RemoveDistrTx();
+}
+
+void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,
+ const TActorContext& ctx)
+{
+ Y_UNUSED(ctx);
+
+ if (PlanStep.Defined() && TxId.Defined()) {
+ if (GetStepAndTxId(event) <= GetStepAndTxId(*PlanStep, *TxId)) {
+ return;
+ }
+ }
+
+ Y_VERIFY(TxInProgress);
+
+ Y_VERIFY(!DistrTxs.empty());
+ TTransaction& t = DistrTxs.front();
- TxInProgress = true;
+ Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
+ Y_VERIFY(t.Predicate.Defined());
+
+ PlanStep = t.Tx->Step;
+ TxId = t.Tx->TxId;
+ TxIdHasChanged = true;
+
+ RemoveDistrTx();
+}
+
+void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+ const TActorContext& ctx)
+{
+ InitPendingUserInfoForImportantClients(event, ctx);
+
+ TSet<TString> hasReadRule;
+
+ for (auto& [consumer, info] : UsersInfoStorage.GetAll()) {
+ PendingReadFromTimestamp[consumer] = TInstant::Zero();
+
+ hasReadRule.insert(consumer);
+ }
+
+ const NKikimrPQ::TPQTabletConfig& config = event.Config;
+
+ for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
+ const auto& consumer = config.GetReadRules(i);
+ auto& userInfo = GetOrCreatePendingUser(consumer, ctx, 0);
+
+ ui64 rrGen = i < config.ReadRuleGenerationsSize() ? config.GetReadRuleGenerations(i) : 0;
+ if (userInfo.ReadRuleGeneration != rrGen) {
+ TEvPQ::TEvSetClientInfo act(0, consumer, 0, "", 0, 0,
+ TEvPQ::TEvSetClientInfo::ESCI_INIT_READ_RULE, rrGen);
+
+ userInfo.Session = "";
+ userInfo.Offset = 0;
+ if (userInfo.Important) {
+ userInfo.Offset = StartOffset;
+ }
+ userInfo.Step = userInfo.Generation = 0;
+
+ ProcessUserAct(act, ctx);
+ }
+
+ userInfo.HasReadRule = true;
+ hasReadRule.erase(consumer);
+
+ TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
+ if (!ts) {
+ ts += TDuration::MilliSeconds(1);
+ }
+ if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) {
+ PendingReadFromTimestamp[consumer] = ts;
+ }
+
+ PendingHasReadRule.insert(consumer);
+ }
+
+ for (auto& consumer : hasReadRule) {
+ auto& userInfo = GetOrCreatePendingUser(consumer, ctx);
+ TEvPQ::TEvSetClientInfo act(0, consumer,
+ 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
+
+ ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
+
+ userInfo.Session = "";
+ userInfo.Offset = 0;
+ userInfo.Step = userInfo.Generation = 0;
+
+ ProcessUserAct(act, ctx);
+ }
+}
+
+void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+ const TActorContext& ctx)
+{
+ Config = event.Config;
+ TopicConverter = event.TopicConverter;
+
+ Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0);
+
+ UsersInfoStorage.UpdateConfig(event.Config);
+
+ WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
+ if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
+ for (auto& userInfo : UsersInfoStorage.GetAll()) {
+ userInfo.second.ReadQuota.UpdateConfig(Config.GetPartitionConfig().GetBurstSize() * 2, Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond() * 2);
+ }
+ }
+
+ for (const auto& readQuota : Config.GetPartitionConfig().GetReadQuota()) {
+ auto& userInfo = UsersInfoStorage.GetOrCreate(readQuota.GetClientId(), ctx);
+ userInfo.ReadQuota.UpdateConfig(readQuota.GetBurstSize(), readQuota.GetSpeedInBytesPerSecond());
+ }
+
+ if (CurrentStateFunc() != &TThis::StateInit) {
+ InitUserInfoForImportantClients(ctx);
+ }
+
+ if (Config.GetPartitionConfig().HasMirrorFrom()) {
+ if (Mirrorer) {
+ ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(event.TopicConverter,
+ event.Config));
+ } else {
+ CreateMirrorerActor();
+ }
+ } else {
+ if (Mirrorer) {
+ ctx.Send(Mirrorer->Actor, new TEvents::TEvPoisonPill());
+ Mirrorer.Reset();
+ }
+ }
+
+ SchedulePartitionConfigChanged();
+}
+
+void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
+ const TActorContext& ctx) {
+ const NKikimrPQ::TPQTabletConfig& config = event.Config;
+ TSet<TString> important;
+
+ for (const auto& consumer : config.GetPartitionConfig().GetImportantClientId()) {
+ important.insert(consumer);
+
+ TUserInfo* userInfo = GetPendingUserIfExists(consumer);
+
+ if (userInfo && !userInfo->Important && userInfo->LabeledCounters) {
+ ScheduleDropPartitionLabeledCounters(userInfo->LabeledCounters->GetGroup());
+ PendingSetImportant[consumer] = true;
+ continue;
+ }
+
+ if (!userInfo) {
+ userInfo = &GetOrCreatePendingUser(consumer, ctx, 0);
+ PendingSetImportant[consumer] = true;
+ }
+
+ if (userInfo->Offset < (i64)StartOffset) {
+ userInfo->Offset = StartOffset;
+ }
+
+ //ReadTimestampForOffset(consumer, *userInfo, ctx);
+ }
+
+ for (auto& [consumer, userInfo] : UsersInfoStorage.GetAll()) {
+ if (!important.contains(consumer) && userInfo.Important && userInfo.LabeledCounters) {
+ ScheduleDropPartitionLabeledCounters(userInfo.LabeledCounters->GetGroup());
+ PendingSetImportant[consumer] = false;
+ }
+ }
+}
+
+void TPartition::ProcessDistrTx(const TActorContext& ctx)
+{
+ Y_VERIFY(!TxInProgress);
+
+ Y_VERIFY(!DistrTxs.empty());
+ TTransaction& t = DistrTxs.front();
+
+ if (t.Tx) {
+ t.Predicate = BeginTransaction(*t.Tx, ctx);
+
+ ctx.Send(Tablet,
+ MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
+ t.Tx->TxId,
+ Partition,
+ *t.Predicate).Release());
+
+ TxInProgress = true;
+ } else {
+ Y_VERIFY(!ChangeConfig);
+
+ ChangeConfig = t.ChangeConfig;
+ BeginChangePartitionConfig(*ChangeConfig, ctx);
+
+ RemoveDistrTx();
+ }
}
void TPartition::ProcessImmediateTxs(const TActorContext& ctx)
@@ -3949,7 +4119,6 @@ void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
const TActorContext& ctx)
{
Y_VERIFY(!UsersInfoWriteInProgress);
- Y_VERIFY(!UserActs.empty());
const TString& user = act.ClientId;
@@ -4169,6 +4338,18 @@ void TPartition::ScheduleReplyCommitDone(ui64 step, ui64 txId)
MakeCommitDone(step, txId).Release());
}
+void TPartition::ScheduleDropPartitionLabeledCounters(const TString& group)
+{
+ Replies.emplace_back(Tablet,
+ MakeHolder<TEvPQ::TEvPartitionLabeledCountersDrop>(Partition, group).Release());
+}
+
+void TPartition::SchedulePartitionConfigChanged()
+{
+ Replies.emplace_back(Tablet,
+ MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release());
+}
+
void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
{
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 1c0ed6d6b8..815d83f8ee 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -45,8 +45,16 @@ struct TTransaction {
Y_VERIFY(Tx);
}
+ explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig) :
+ ChangeConfig(changeConfig)
+ {
+ Y_VERIFY(ChangeConfig);
+ }
+
TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx;
TMaybe<bool> Predicate;
+
+ TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
};
class TPartition : public TActorBootstrapped<TPartition> {
@@ -195,6 +203,7 @@ private:
void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
+ void AddDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
void RemoveDistrTx();
void ProcessDistrTxs(const TActorContext& ctx);
void ProcessDistrTx(const TActorContext& ctx);
@@ -226,6 +235,8 @@ private:
void ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
void ScheduleReplyCommitDone(ui64 step, ui64 txId);
+ void ScheduleDropPartitionLabeledCounters(const TString& group);
+ void SchedulePartitionConfigChanged();
void AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
@@ -252,6 +263,21 @@ private:
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);
+ bool BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event,
+ const TActorContext& ctx);
+ void EndTransaction(const TEvPQ::TEvTxCommit& event,
+ const TActorContext& ctx);
+ void EndTransaction(const TEvPQ::TEvTxRollback& event,
+ const TActorContext& ctx);
+
+ void BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+ const TActorContext& ctx);
+ void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+ const TActorContext& ctx);
+
+ void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
+ const TActorContext& ctx);
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -490,6 +516,9 @@ private:
std::deque<TTransaction> DistrTxs;
THashMap<TString, size_t> UserActCount;
THashMap<TString, TUserInfo> PendingUsersInfo;
+ THashMap<TString, TInstant> PendingReadFromTimestamp;
+ THashMap<TString, bool> PendingSetImportant;
+ THashSet<TString> PendingHasReadRule;
TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
THashSet<TString> AffectedUsers;
bool UsersInfoWriteInProgress = false;
@@ -497,6 +526,7 @@ private:
TMaybe<ui64> PlanStep;
TMaybe<ui64> TxId;
bool TxIdHasChanged = false;
+ TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
//
//
//
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index a5f1c35a79..38682b6379 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -653,6 +653,8 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
for (auto& p : Partitions) { //change config for already created partitions
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
}
+ ChangePartitionConfigInflight += Partitions.size();
+
for (const auto& partition : Config.GetPartitions()) {
const auto partitionId = partition.GetPartitionId();
if (Partitions.find(partitionId) == Partitions.end()) {
@@ -669,21 +671,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
++PartitionsInited; //newly created partition is empty and ready to work
}
}
- for (auto& p : ChangeConfigNotification) {
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
- << " Config applied version " << Config.GetVersion() << " actor " << p.Actor
- << " txId " << p.TxId << " config:\n" << Config.DebugString());
- THolder<TEvPersQueue::TEvUpdateConfigResponse> res{new TEvPersQueue::TEvUpdateConfigResponse};
- res->Record.SetStatus(NKikimrPQ::OK);
- res->Record.SetTxId(p.TxId);
- res->Record.SetOrigin(TabletID());
- ctx.Send(p.Actor, res.Release());
+ if (!ChangePartitionConfigInflight) {
+ OnAllPartitionConfigChanged(ctx);
}
- ChangeConfigNotification.clear();
- NewConfigShouldBeApplied = false;
- NewConfig.Clear();
-
}
void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
@@ -1140,6 +1131,37 @@ void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorCon
}
+void TPersQueue::Handle(TEvPQ::TEvPartitionConfigChanged::TPtr&, const TActorContext& ctx)
+{
+ Y_VERIFY(ChangePartitionConfigInflight > 0);
+ --ChangePartitionConfigInflight;
+
+ if (ChangePartitionConfigInflight) {
+ return;
+ }
+
+ OnAllPartitionConfigChanged(ctx);
+}
+
+void TPersQueue::OnAllPartitionConfigChanged(const TActorContext& ctx)
+{
+ for (auto& p : ChangeConfigNotification) {
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
+ << " Config applied version " << Config.GetVersion() << " actor " << p.Actor
+ << " txId " << p.TxId << " config:\n" << Config.DebugString());
+
+ THolder<TEvPersQueue::TEvUpdateConfigResponse> res{new TEvPersQueue::TEvUpdateConfigResponse};
+ res->Record.SetStatus(NKikimrPQ::OK);
+ res->Record.SetTxId(p.TxId);
+ res->Record.SetOrigin(TabletID());
+ ctx.Send(p.Actor, res.Release());
+ }
+
+ ChangeConfigNotification.clear();
+ NewConfigShouldBeApplied = false;
+ NewConfig.Clear();
+}
+
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
auto& record = ev->Record;
@@ -2265,6 +2287,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
+ HFuncTraced(TEvPQ::TEvPartitionConfigChanged, Handle);
default:
return false;
}
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index c3fdd43337..407474fca6 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -65,6 +65,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
//client requests
void Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvPartitionConfigChanged::TPtr& ev, const TActorContext& ctx);
void ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx);
@@ -96,6 +97,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
const NKikimrClient::TPersQueuePartitionRequest::TCmdDeregisterMessageGroup& cmd,
NPersQueue::NErrorCode::EErrorCode& code, TString& error) const;
+ void OnAllPartitionConfigChanged(const TActorContext& ctx);
+
//client request
void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx);
#define DESCRIBE_HANDLE(A) void A(const ui64 responseCookie, const TActorId& partActor, \
@@ -143,6 +146,7 @@ private:
TSet<TChangeNotification> ChangeConfigNotification;
NKikimrPQ::TPQTabletConfig NewConfig;
bool NewConfigShouldBeApplied;
+ size_t ChangePartitionConfigInflight = 0;
TString TopicName;
TString TopicPath;
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index 8c1bce2c57..6f10cd85a7 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -314,7 +314,7 @@ Y_UNIT_TEST(ImportantFlagSwitching) {
options.FinalEvents.emplace_back(TEvTabletCounters::EvTabletAddLabeledCounters);
tc.Runtime->DispatchEvents(options);
}
- CheckLabeledCountersResponse(tc, 8, {NKikimr::JoinPath({"user/1", topicName})});
+ CheckLabeledCountersResponse(tc, 8, MakeTopics({"user/1"}));
PQTabletPrepare({}, {}, tc);
{
diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
index 275d163724..25f22548a2 100644
--- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp
+++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
@@ -49,10 +49,16 @@ struct TCreateConsumerParams {
class TUserActionProcessorFixture : public NUnitTest::TBaseFixture {
protected:
struct TUserInfoMatcher {
+ TMaybe<TString> Consumer;
TMaybe<TString> Session;
TMaybe<ui64> Offset;
TMaybe<ui32> Generation;
TMaybe<ui32> Step;
+ TMaybe<ui64> ReadRuleGeneration;
+ };
+
+ struct TDeleteRangeMatcher {
+ TMaybe<TString> Consumer;
};
struct TCmdWriteMatcher {
@@ -60,6 +66,7 @@ protected:
TMaybe<ui64> PlanStep;
TMaybe<ui64> TxId;
THashMap<size_t, TUserInfoMatcher> UserInfos;
+ THashMap<size_t, TDeleteRangeMatcher> DeleteRanges;
};
struct TProxyResponseMatcher {
@@ -93,6 +100,10 @@ protected:
TMaybe<ui32> Partition;
};
+ struct TChangePartitionConfigMatcher {
+ TMaybe<ui32> Partition;
+ };
+
using TCreatePartitionParams = NHelpers::TCreatePartitionParams;
using TCreateConsumerParams = NHelpers::TCreateConsumerParams;
@@ -163,6 +174,9 @@ protected:
void SendRollbackTx(ui64 step, ui64 txId);
void WaitCommitTxDone(const TCommitTxDoneMatcher& matcher = {});
+ void SendChangePartitionConfig(const TVector<TCreateConsumerParams>& consumers = {});
+ void WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher = {});
+
TTransaction MakeTransaction(ui64 step, ui64 txId,
TString consumer,
ui64 begin, ui64 end,
@@ -173,6 +187,9 @@ private:
TMaybe<TFinalizer> Finalizer;
TActorId ActorId;
+
+ NPersQueue::TTopicConverterPtr TopicConverter;
+ NKikimrPQ::TPQTabletConfig Config;
};
void TUserActionProcessorFixture::SetUp(NUnitTest::TTestContext&)
@@ -212,31 +229,29 @@ void TUserActionProcessorFixture::CreatePartitionActor(ui32 id,
TAutoPtr<TCounters> counters(new TCounters());
TAutoPtr<TTabletCountersBase> tabletCounters = counters->GetSecondTabletCounters().Release();
- NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
- NPersQueue::TTopicConverterPtr topicConverter;
- NKikimrPQ::TPQTabletConfig config;
-
for (auto& c : consumers) {
- config.AddReadRules(c.Consumer);
+ Config.AddReadRules(c.Consumer);
}
- config.SetTopicName("rt3.dc1--account--topic");
- config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
- config.SetFederationAccount("account");
- config.SetLocalDC(true);
- config.SetYdbDatabasePath("");
+ Config.SetTopicName("rt3.dc1--account--topic");
+ Config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
+ Config.SetFederationAccount("account");
+ Config.SetLocalDC(true);
+ Config.SetYdbDatabasePath("");
+
+ NPersQueue::TTopicNamesConverterFactory factory(true, "/Root/PQ", "dc1");
- topicConverter = factory.MakeTopicConverter(config);
+ TopicConverter = factory.MakeTopicConverter(Config);
auto actor = new NPQ::TPartition(Ctx->TabletId,
id,
Ctx->Edge,
Ctx->Edge,
- topicConverter,
+ TopicConverter,
true,
"dcId",
false,
- config,
+ Config,
*tabletCounters,
newPartition,
std::move(txs));
@@ -334,8 +349,13 @@ void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
UNIT_ASSERT_VALUES_EQUAL(event->Record.GetCookie(), 1); // SET_OFFSET_COOKIE
if (matcher.Count.Defined()) {
- UNIT_ASSERT_VALUES_EQUAL(*matcher.Count, event->Record.CmdWriteSize());
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Count,
+ event->Record.CmdWriteSize() + event->Record.CmdDeleteRangeSize());
}
+
+ //
+ // TxMeta
+ //
if (matcher.PlanStep.Defined()) {
NKikimrPQ::TPartitionTxMeta meta;
UNIT_ASSERT(meta.ParseFromString(event->Record.GetCmdWrite(0).GetValue()));
@@ -348,10 +368,12 @@ void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, meta.GetTxId());
}
+
+ //
+ // CmdWrite
+ //
for (auto& [index, userInfo] : matcher.UserInfos) {
- if (matcher.Count.Defined()) {
- UNIT_ASSERT(index < *matcher.Count);
- }
+ UNIT_ASSERT(index < event->Record.CmdWriteSize());
NKikimrPQ::TUserInfo ud;
UNIT_ASSERT(ud.ParseFromString(event->Record.GetCmdWrite(index).GetValue()));
@@ -372,6 +394,19 @@ void TUserActionProcessorFixture::WaitCmdWrite(const TCmdWriteMatcher& matcher)
UNIT_ASSERT(ud.HasOffset());
UNIT_ASSERT_VALUES_EQUAL(*userInfo.Offset, ud.GetOffset());
}
+ if (userInfo.ReadRuleGeneration) {
+ UNIT_ASSERT(ud.HasReadRuleGeneration());
+ UNIT_ASSERT_VALUES_EQUAL(*userInfo.ReadRuleGeneration, ud.GetReadRuleGeneration());
+ }
+ }
+
+ //
+ // CmdDeleteRange
+ //
+ for (auto& [index, deleteRange] : matcher.DeleteRanges) {
+ UNIT_ASSERT(index < event->Record.CmdDeleteRangeSize());
+
+ Y_UNUSED(deleteRange);
}
}
@@ -663,6 +698,31 @@ void TUserActionProcessorFixture::WaitCommitTxDone(const TCommitTxDoneMatcher& m
}
}
+void TUserActionProcessorFixture::SendChangePartitionConfig(const TVector<TCreateConsumerParams>& consumers)
+{
+ auto config = Config;
+ config.ClearReadRules();
+ config.ClearReadRuleGenerations();
+
+ for (auto& c : consumers) {
+ config.AddReadRules(c.Consumer);
+ config.AddReadRuleGenerations(c.Generation);
+ }
+
+ auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, config);
+ Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
+}
+
+void TUserActionProcessorFixture::WaitPartitionConfigChanged(const TChangePartitionConfigMatcher& matcher)
+{
+ auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvPartitionConfigChanged>();
+ UNIT_ASSERT(event != nullptr);
+
+ if (matcher.Partition) {
+ UNIT_ASSERT_VALUES_EQUAL(*matcher.Partition, event->Partition);
+ }
+}
+
TTransaction TUserActionProcessorFixture::MakeTransaction(ui64 step, ui64 txId,
TString consumer,
ui64 begin, ui64 end,
@@ -1060,6 +1120,55 @@ Y_UNIT_TEST_F(CorrectRange_Rollback, TUserActionProcessorFixture)
WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=true});
}
+Y_UNIT_TEST_F(ChangeConfig, TUserActionProcessorFixture)
+{
+ const ui32 partition = 3;
+ const ui64 begin = 0;
+ const ui64 end = 10;
+ const TString client = "client";
+ const TString session = "session";
+
+ const ui64 step = 12345;
+ const ui64 txId_1 = 67890;
+ const ui64 txId_2 = 67891;
+
+ CreatePartition({.Partition=partition, .Begin=begin, .End=end}, {
+ {.Consumer="client-1", .Offset=0, .Session="session-1"},
+ {.Consumer="client-2", .Offset=0, .Session="session-2"},
+ {.Consumer="client-3", .Offset=0, .Session="session-3"}
+ });
+
+ SendCalcPredicate(step, txId_1, "client-1", 0, 2);
+ SendChangePartitionConfig({{.Consumer="client-1", .Generation=0},
+ { .Consumer="client-3", .Generation=7}});
+ //
+ // consumer 'client-2' will be deleted
+ //
+ SendCalcPredicate(step, txId_2, "client-2", 0, 2);
+
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_1, .Partition=partition, .Predicate=true});
+ SendCommitTx(step, txId_1);
+
+ //
+ // consumer 'client-2' was deleted
+ //
+ WaitCalcPredicateResult({.Step=step, .TxId=txId_2, .Partition=partition, .Predicate=false});
+ SendRollbackTx(step, txId_2);
+
+ WaitCmdWrite({.Count=7,
+ .PlanStep=step, .TxId=txId_2,
+ .UserInfos={
+ {1, {.Consumer="client-1", .Session="", .Offset=2}},
+ {3, {.Consumer="client-3", .Session="", .Offset=0, .ReadRuleGeneration=7}}
+ },
+ .DeleteRanges={
+ {0, {.Consumer="client-2"}}
+ }});
+ SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
+
+ WaitPartitionConfigChanged({.Partition=partition});
+}
+
} // Y_UNIT_TEST_SUITE(TUserActionProcessorTests)
} // namespace NKikimr::NPQ