diff options
author | abcdef <akotov@ydb.tech> | 2023-03-09 12:20:44 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-03-09 12:20:44 +0300 |
commit | 3cb6a9ccfacc08ffc3dc9247a97ac4f2f3d87347 (patch) | |
tree | c0f4945f7ca5a49148055152055392545d02bbbc | |
parent | b576e5c308523206d523003d4429ce9832b6d9bf (diff) | |
download | ydb-3cb6a9ccfacc08ffc3dc9247a97ac4f2f3d87347.tar.gz |
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/topics/kqp_topics.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/topics/kqp_topics.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 28 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 157 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 36 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 310 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 30 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 100 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/make_config.cpp | 36 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/make_config.h | 27 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 44 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 143 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 33 |
19 files changed, 785 insertions, 192 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 12936d3c857..4911802a765 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1951,7 +1951,7 @@ private: } using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>; - using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TKqpTransaction>; + using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>; void ContinueExecute() { UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE; @@ -2115,13 +2115,13 @@ private: for (auto& [_, tx] : topicTxs) { switch (locksOp) { case NKikimrTxDataShard::TKqpLocks::Commit: - tx.SetOp(NKikimrPQ::TKqpTransaction::Commit); + tx.SetOp(NKikimrPQ::TDataTransaction::Commit); break; case NKikimrTxDataShard::TKqpLocks::Validate: - tx.SetOp(NKikimrPQ::TKqpTransaction::Validate); + tx.SetOp(NKikimrPQ::TDataTransaction::Validate); break; case NKikimrTxDataShard::TKqpLocks::Rollback: - tx.SetOp(NKikimrPQ::TKqpTransaction::Rollback); + tx.SetOp(NKikimrPQ::TDataTransaction::Rollback); break; case NKikimrTxDataShard::TKqpLocks::Unspecified: break; @@ -2303,7 +2303,7 @@ private: transaction.SetImmediate(ImmediateTx); ActorIdToProto(SelfId(), ev->Record.MutableSource()); - ev->Record.MutableTxBody()->Swap(&transaction); + ev->Record.MutableData()->Swap(&transaction); ev->Record.SetTxId(TxId); auto traceId = ExecuterSpan.GetTraceId(); diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index 6f9aba3419b..7afb30f2f93 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -89,7 +89,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio HasWriteOperations_ = true; } -void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs) +void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs) { Y_VERIFY(TabletId_.Defined()); Y_VERIFY(Partition_.Defined()); @@ -326,7 +326,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac return true; } -void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs) +void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs) { for (auto& [_, operations] : Operations_) { operations.BuildTopicTxs(txs); diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h index 70219893655..e285b1b06d9 100644 --- a/ydb/core/kqp/topics/kqp_topics.h +++ b/ydb/core/kqp/topics/kqp_topics.h @@ -50,7 +50,7 @@ public: const Ydb::Topic::OffsetsRange& range); void AddOperation(const TString& topic, ui32 partition); - void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs); + void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs); void Merge(const TTopicPartitionOperations& rhs); @@ -102,7 +102,7 @@ public: Ydb::StatusIds_StatusCode& status, TString& message); - void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs); + void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs); void Merge(const TTopicOperations& rhs); diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 83cbd94bd59..18a8190f1de 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -120,6 +120,8 @@ struct TEvPQ { EvMetering, EvTxCalcPredicate, EvTxCalcPredicateResult, + EvProposePartitionConfig, + EvProposePartitionConfigResult, EvTxCommit, EvTxCommitDone, EvTxRollback, @@ -710,6 +712,32 @@ struct TEvPQ { bool Predicate = false; }; + struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> { + TEvProposePartitionConfig(ui64 step, ui64 txId) : + Step(step), + TxId(txId) + { + } + + ui64 Step; + ui64 TxId; + NPersQueue::TTopicConverterPtr TopicConverter; + NKikimrPQ::TPQTabletConfig Config; + }; + + struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> { + TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) : + Step(step), + TxId(txId), + Partition(partition) + { + } + + ui64 Step; + ui64 TxId; + ui32 Partition; + }; + struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> { TEvTxCommit(ui64 step, ui64 txId) : Step(step), diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 19fd5a28d55..8a175527f51 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1877,7 +1877,7 @@ void TPartition::InitComplete(const TActorContext& ctx) { TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds()); FillReadFromTimestamps(Config, ctx); - + ResendPendingEvents(ctx); ProcessTxsAndUserActs(ctx); ctx.Send(ctx.SelfID, new TEvents::TEvWakeup()); @@ -2354,7 +2354,9 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; - const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody(); + Y_VERIFY(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); + Y_VERIFY(event.HasData()); + const NKikimrPQ::TDataTransaction& txBody = event.GetData(); if (!txBody.GetImmediate()) { ReplyPropose(ctx, @@ -2375,6 +2377,33 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc ProcessTxsAndUserActs(ctx); } +void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx) +{ + PushBackDistrTx(ev->Release()); + + ProcessTxsAndUserActs(ctx); +} + +void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&) +{ + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + +void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&) +{ + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + +void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&) +{ + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + +void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&) +{ + PendingEvents.emplace_back(ev->ReleaseBase().Release()); +} + void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx) { PushBackDistrTx(ev->Release()); @@ -3679,7 +3708,9 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx) Y_VERIFY(cookie == SET_OFFSET_COOKIE); if (ChangeConfig) { - EndChangePartitionConfig(*ChangeConfig, ctx); + EndChangePartitionConfig(ChangeConfig->Config, + ChangeConfig->TopicConverter, + ctx); ChangeConfig = nullptr; } @@ -3752,6 +3783,11 @@ void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConf DistrTxs.emplace_front(std::move(event), false); } +void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event) +{ + DistrTxs.emplace_back(std::move(event)); +} + void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx) { ImmediateTxs.push_back(std::move(tx)); @@ -3893,6 +3929,15 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, return predicate; } +bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) +{ + ChangeConfig = + MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter, + event.Config); + SendChangeConfigReply = false; + return true; +} + void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event, const TActorContext& ctx) { @@ -3908,23 +3953,34 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event, Y_VERIFY(!DistrTxs.empty()); TTransaction& t = DistrTxs.front(); - Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); - Y_VERIFY(t.Predicate.Defined() && *t.Predicate); + if (t.Tx) { + Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); + Y_VERIFY(t.Predicate.Defined() && *t.Predicate); - for (auto& operation : t.Tx->Operations) { - TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx); + for (auto& operation : t.Tx->Operations) { + TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx); - Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin()); + Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin()); - userInfo.Offset = operation.GetEnd(); - userInfo.Session = ""; - } + userInfo.Offset = operation.GetEnd(); + userInfo.Session = ""; + } - PlanStep = t.Tx->Step; - TxId = t.Tx->TxId; - TxIdHasChanged = true; + ChangePlanStepAndTxId(t.Tx->Step, t.Tx->TxId); - ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId); + ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId); + } else if (t.ProposeConfig) { + Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.ProposeConfig)); + Y_VERIFY(t.Predicate.Defined() && *t.Predicate); + + BeginChangePartitionConfig(t.ProposeConfig->Config, ctx); + + ChangePlanStepAndTxId(t.ProposeConfig->Step, t.ProposeConfig->TxId); + + ScheduleReplyCommitDone(t.ProposeConfig->Step, t.ProposeConfig->TxId); + } else { + Y_VERIFY(t.ChangeConfig); + } RemoveDistrTx(); } @@ -3945,20 +4001,28 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event, Y_VERIFY(!DistrTxs.empty()); TTransaction& t = DistrTxs.front(); - Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); - Y_VERIFY(t.Predicate.Defined()); + if (t.Tx) { + Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx)); + Y_VERIFY(t.Predicate.Defined()); + + ChangePlanStepAndTxId(t.Tx->Step, t.Tx->TxId); + } else if (t.ProposeConfig) { + Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.ProposeConfig)); + Y_VERIFY(t.Predicate.Defined()); + + ChangePlanStepAndTxId(t.ProposeConfig->Step, t.ProposeConfig->TxId); + } else { + Y_VERIFY(t.ChangeConfig); + } - PlanStep = t.Tx->Step; - TxId = t.Tx->TxId; - TxIdHasChanged = true; RemoveDistrTx(); } -void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, +void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { - InitPendingUserInfoForImportantClients(event, ctx); + InitPendingUserInfoForImportantClients(config, ctx); TSet<TString> hasReadRule; @@ -3968,8 +4032,6 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi hasReadRule.insert(consumer); } - const NKikimrPQ::TPQTabletConfig& config = event.Config; - for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); auto& userInfo = GetOrCreatePendingUser(consumer, ctx, 0); @@ -4020,15 +4082,16 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi ReportCounters(ctx); } -void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, +void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, + NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx) { - Config = event.Config; - TopicConverter = event.TopicConverter; + Config = config; + TopicConverter = topicConverter; Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0); - UsersInfoStorage->UpdateConfig(event.Config); + UsersInfoStorage->UpdateConfig(Config); WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond()); if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) { @@ -4048,8 +4111,8 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& if (Config.GetPartitionConfig().HasMirrorFrom()) { if (Mirrorer) { - ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(event.TopicConverter, - event.Config)); + ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, + Config)); } else { CreateMirrorerActor(); } @@ -4071,9 +4134,23 @@ TString TPartition::GetKeyConfig() const return Sprintf("_config_%u", Partition); } -void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, +void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId) +{ + PlanStep = step; + TxId = txId; + TxIdHasChanged = true; +} + +void TPartition::ResendPendingEvents(const TActorContext& ctx) +{ + while (!PendingEvents.empty()) { + ctx.Schedule(TDuration::Zero(), PendingEvents.front().release()); + PendingEvents.pop_front(); + } +} + +void TPartition::InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) { - const NKikimrPQ::TPQTabletConfig& config = event.Config; TSet<TString> important; for (const auto& consumer : config.GetPartitionConfig().GetImportantClientId()) { @@ -4124,12 +4201,21 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx) *t.Predicate).Release()); TxInProgress = true; + } else if (t.ProposeConfig) { + t.Predicate = BeginTransaction(*t.ProposeConfig); + + ctx.Send(Tablet, + MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step, + t.ProposeConfig->TxId, + Partition).Release()); + + TxInProgress = true; } else { Y_VERIFY(!ChangeConfig); ChangeConfig = t.ChangeConfig; SendChangeConfigReply = t.SendReply; - BeginChangePartitionConfig(*ChangeConfig, ctx); + BeginChangePartitionConfig(ChangeConfig->Config, ctx); RemoveDistrTx(); } @@ -4149,7 +4235,10 @@ void TPartition::ProcessImmediateTxs(const TActorContext& ctx) void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx, const TActorContext& ctx) { - for (auto& operation : tx.GetTxBody().GetOperations()) { + Y_VERIFY(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); + Y_VERIFY(tx.HasData()); + + for (auto& operation : tx.GetData().GetOperations()) { Y_VERIFY(operation.HasBegin() && operation.HasEnd() && operation.HasConsumer()); Y_VERIFY(operation.GetBegin() <= (ui64)Max<i64>(), "Unexpected begin offset: %" PRIu64, operation.GetBegin()); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 6bde9bde361..181c4c22792 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -57,11 +57,19 @@ struct TTransaction { Y_VERIFY(ChangeConfig); } + explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> proposeConfig) : + ProposeConfig(proposeConfig) + { + Y_VERIFY(ProposeConfig); + } + TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx; TMaybe<bool> Predicate; TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig; bool SendReply; + + TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> ProposeConfig; }; class TPartition : public TActorBootstrapped<TPartition> { @@ -212,6 +220,7 @@ private: void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event); void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event); void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event); + void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event); void RemoveDistrTx(); void ProcessDistrTxs(const TActorContext& ctx); void ProcessDistrTx(const TActorContext& ctx); @@ -274,18 +283,20 @@ private: bool BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event, const TActorContext& ctx); + bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event); void EndTransaction(const TEvPQ::TEvTxCommit& event, const TActorContext& ctx); void EndTransaction(const TEvPQ::TEvTxRollback& event, const TActorContext& ctx); - void BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, + void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); - void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event, + void EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config, + NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx); TString GetKeyConfig() const; - void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event, + void InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx); void RequestConfig(const TActorContext& ctx); @@ -298,6 +309,17 @@ private: } void EmplaceResponse(TMessage&& message, const TActorContext& ctx); + void Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx); + + void HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx); + void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx); + + void ChangePlanStepAndTxId(ui64 step, ui64 txId); + + void ResendPendingEvents(const TActorContext& ctx); + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR; @@ -370,6 +392,10 @@ private: HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle); HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle); HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle); + HFuncTraced(TEvPQ::TEvTxCalcPredicate, HandleOnInit); + HFuncTraced(TEvPQ::TEvProposePartitionConfig, HandleOnInit); + HFuncTraced(TEvPQ::TEvTxCommit, HandleOnInit); + HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit); default: LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev)); break; @@ -418,6 +444,7 @@ private: HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle); HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle); HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle); + HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle); HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); @@ -470,6 +497,7 @@ private: HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnWrite); HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle); HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle); + HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle); HFuncTraced(TEvPQ::TEvTxCommit, Handle); HFuncTraced(TEvPQ::TEvTxRollback, Handle); @@ -661,6 +689,8 @@ private: THolder<TMirrorerInfo> Mirrorer; TInstant LastUsedStorageMeterTimestamp; + + TDeque<std::unique_ptr<IEventBase>> PendingEvents; }; } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index fc14f81d847..fa43b019169 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -609,7 +609,39 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) // in order to answer only after all parts are ready to work Y_VERIFY(ConfigInited && PartitionsInited == Partitions.size()); - Config = NewConfig; + ApplyNewConfig(NewConfig, ctx); + + for (auto& p : Partitions) { //change config for already created partitions + ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); + } + ChangePartitionConfigInflight += Partitions.size(); + + for (const auto& partition : Config.GetPartitions()) { + const auto partitionId = partition.GetPartitionId(); + if (Partitions.find(partitionId) == Partitions.end()) { + Partitions.emplace(partitionId, TPartitionInfo( + ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter, + IsLocalDC, DCId, IsServerless, Config, *Counters, + true)), + GetPartitionKeyRange(partition), + true, + *Counters + )); + + // InitCompleted is true because this partition is empty + ++PartitionsInited; //newly created partition is empty and ready to work + } + } + + if (!ChangePartitionConfigInflight) { + OnAllPartitionConfigChanged(ctx); + } +} + +void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, + const TActorContext& ctx) +{ + Config = newConfig; if (!Config.PartitionsSize()) { for (const auto partitionId : Config.GetPartitionIds()) { @@ -618,8 +650,9 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) } ui32 cacheSize = CACHE_SIZE; - if (Config.HasCacheSize()) + if (Config.HasCacheSize()) { cacheSize = Config.GetCacheSize(); + } if (!TopicConverter) { // it's the first time TopicName = Config.GetTopicName(); @@ -649,35 +682,9 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) } InitializeMeteringSink(ctx); - - for (auto& p : Partitions) { //change config for already created partitions - ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config)); - } - ChangePartitionConfigInflight += Partitions.size(); - - for (const auto& partition : Config.GetPartitions()) { - const auto partitionId = partition.GetPartitionId(); - if (Partitions.find(partitionId) == Partitions.end()) { - Partitions.emplace(partitionId, TPartitionInfo( - ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter, - IsLocalDC, DCId, IsServerless, Config, *Counters, - true)), - GetPartitionKeyRange(partition), - true, - *Counters - )); - - // InitCompleted is true because this partition is empty - ++PartitionsInited; //newly created partition is empty and ready to work - } - } - - if (!ChangePartitionConfigInflight) { - OnAllPartitionConfigChanged(ctx); - } } -void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) +void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TActorContext& ctx) { if (resp.GetStatus() != NMsgBusProxy::MSTATUS_OK || resp.WriteResultSize() < 1) { @@ -954,7 +961,7 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& switch (resp.GetCookie()) { case WRITE_CONFIG_COOKIE: - HandleConfigWriteResponse(resp, ctx); + EndWriteConfig(resp, ctx); break; case READ_CONFIG_COOKIE: // read is only for config - is signal to create interal actors @@ -1323,15 +1330,39 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf << " txId " << record.GetTxId() << " config:\n" << cfg.DebugString()); TString str; - Y_VERIFY(CheckPersQueueConfig(cfg, true, &str), "%s", str.c_str()); - bool res = cfg.SerializeToString(&str); - Y_VERIFY(res); + BeginWriteConfig(cfg, bootstrapCfg, ctx); + + NewConfig = cfg; +} +void TPersQueue::BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg, + const NKikimrPQ::TBootstrapConfig& bootstrapCfg, + const TActorContext& ctx) +{ TAutoPtr<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); request->Record.SetCookie(WRITE_CONFIG_COOKIE); + AddCmdWriteConfig(request.Get(), + cfg, + bootstrapCfg, + ctx); + Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1); + + ctx.Send(ctx.SelfID, request.Release()); +} + +void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, + const NKikimrPQ::TPQTabletConfig& cfg, + const NKikimrPQ::TBootstrapConfig& bootstrapCfg, + const TActorContext& ctx) +{ + Y_VERIFY(request); + + TString str; + Y_VERIFY(cfg.SerializeToString(&str)); + auto write = request->Record.AddCmdWrite(); write->SetKey(KeyConfig()); write->SetValue(str); @@ -1348,16 +1379,10 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf } for (const auto& partition : cfg.GetPartitions()) { - sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId()); + sourceIdWriter.FillRequest(request, partition.GetPartitionId()); } - - Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1); - - NewConfig = cfg; - ctx.Send(ctx.SelfID, request.Release()); } - void TPersQueue::Handle(TEvPersQueue::TEvDropTablet::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; @@ -2246,10 +2271,32 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction. topicPath=" << TopicPath); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction"); NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; - const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody(); + switch (event.GetTxBodyCase()) { + case NKikimrPQ::TEvProposeTransaction::kData: + HandleDataTransaction(ev->Release(), ctx); + break; + case NKikimrPQ::TEvProposeTransaction::kConfig: + HandleConfigTransaction(ev->Release(), ctx); + break; + case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET: + SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()), + event.GetTxId(), + ctx); + break; + } + +} + +void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev, + const TActorContext& ctx) +{ + NKikimrPQ::TEvProposeTransaction& event = ev->Record; + Y_VERIFY(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData); + Y_VERIFY(event.HasData()); + const NKikimrPQ::TDataTransaction& txBody = event.GetData(); for (auto& operation : txBody.GetOperations()) { Y_VERIFY(!operation.HasPath() || (operation.GetPath() == TopicPath)); @@ -2287,17 +2334,26 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc auto i = Partitions.find(txBody.GetOperations(0).GetPartitionId()); Y_VERIFY(i != Partitions.end()); - // - // FIXME(abcdef): последовательность вызовов Release - // - ctx.Send(i->second.Actor, ev->Release().Release()); + ctx.Send(i->second.Actor, ev.Release()); } else { - EvProposeTransactionQueue.emplace_back(ev->Release().Release()); + EvProposeTransactionQueue.emplace_back(ev.Release()); TryWriteTxs(ctx); } } +void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev, + const TActorContext& ctx) +{ + NKikimrPQ::TEvProposeTransaction& event = ev->Record; + Y_VERIFY(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig); + Y_VERIFY(event.HasConfig()); + + EvProposeTransactionQueue.emplace_back(ev.Release()); + + TryWriteTxs(ctx); +} + void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx) { NKikimrTx::TEvMediatorPlanStep& event = ev->Get()->Record; @@ -2369,6 +2425,22 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC TryWriteTxs(ctx); } +void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx) +{ + const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get(); + + auto tx = GetTransaction(ctx, event.TxId); + if (!tx) { + return; + } + + tx->OnProposePartitionConfigResult(event); + + CheckTxState(ctx, *tx); + + TryWriteTxs(ctx); +} + void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& ctx) { const TEvPQ::TEvTxCommitDone& event = *ev->Get(); @@ -2401,6 +2473,7 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx) ProcessWriteTxs(ctx, request->Record); ProcessDeleteTxs(ctx, request->Record); AddCmdWriteTabletTxInfo(request->Record); + ProcessConfigTx(ctx, request.Get()); WriteTxsInProgress = true; @@ -2570,6 +2643,24 @@ void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx, DeleteTxs.clear(); } +void TPersQueue::ProcessConfigTx(const TActorContext& ctx, + TEvKeyValue::TEvRequest* request) +{ + Y_VERIFY(!WriteTxsInProgress); + + if (!TabletConfigTx.Defined()) { + return; + } + + AddCmdWriteConfig(request, + *TabletConfigTx, + *BootstrapConfigTx, + ctx); + + TabletConfigTx = Nothing(); + BootstrapConfigTx = Nothing(); +} + void TPersQueue::AddCmdWriteTabletTxInfo(NKikimrClient::TKeyValueRequest& request) { NKikimrPQ::TTabletTxInfo info; @@ -2786,10 +2877,17 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::PLANNED: if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) { - SendEvTxCalcPredicateToPartitions(ctx, tx); - - tx.PartitionRepliesCount = 0; - tx.PartitionRepliesExpected = Partitions.size(); + switch (tx.Kind) { + case NKikimrPQ::TTransaction::KIND_DATA: + SendEvTxCalcPredicateToPartitions(ctx, tx); + break; + case NKikimrPQ::TTransaction::KIND_CONFIG: + CreateNewPartitions(tx.TabletConfig, ctx); + SendEvProposePartitionConfig(ctx, tx); + break; + case NKikimrPQ::TTransaction::KIND_UNKNOWN: + Y_VERIFY(false); + } tx.State = NKikimrPQ::TTransaction::CALCULATING; } @@ -2800,11 +2898,21 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, Y_VERIFY(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected); if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { - SendEvReadSetToReceivers(ctx, tx); + switch (tx.Kind) { + case NKikimrPQ::TTransaction::KIND_DATA: + SendEvReadSetToReceivers(ctx, tx); - WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS); + WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS); - tx.State = NKikimrPQ::TTransaction::CALCULATED; + tx.State = NKikimrPQ::TTransaction::CALCULATED; + break; + case NKikimrPQ::TTransaction::KIND_CONFIG: + tx.State = NKikimrPQ::TTransaction::WAIT_RS; + CheckTxState(ctx, tx); + break; + case NKikimrPQ::TTransaction::KIND_UNKNOWN: + Y_VERIFY(false); + } } break; @@ -2824,13 +2932,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, if (tx.HaveParticipantsDecision()) { SendEvProposeTransactionResult(ctx, tx); - tx.PartitionRepliesCount = 0; if (tx.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT) { SendEvTxCommitToPartitions(ctx, tx); - tx.PartitionRepliesExpected = Partitions.size(); } else { SendEvTxRollbackToPartitions(ctx, tx); - tx.PartitionRepliesExpected = 0; } tx.State = NKikimrPQ::TTransaction::EXECUTING; @@ -2847,7 +2952,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, Y_VERIFY(!TxQueue.empty()); Y_VERIFY(TxQueue.front().second == tx.TxId); - SendEvReadSetAckToSenders(ctx, tx); + switch (tx.Kind) { + case NKikimrPQ::TTransaction::KIND_DATA: + SendEvReadSetAckToSenders(ctx, tx); + break; + case NKikimrPQ::TTransaction::KIND_CONFIG: + ApplyNewConfig(tx.TabletConfig, ctx); + TabletConfigTx = tx.TabletConfig; + BootstrapConfigTx = tx.BootstrapConfig; + break; + case NKikimrPQ::TTransaction::KIND_UNKNOWN: + Y_VERIFY(false); + } tx.State = NKikimrPQ::TTransaction::EXECUTED; @@ -2924,6 +3040,83 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target, ctx.Send(target, std::move(event)); } +void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, + TDistributedTransaction& tx) +{ + for (auto& [_, partition] : Partitions) { + auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId); + + event->TopicConverter = TopicConverter; + event->Config = tx.TabletConfig; + + ctx.Send(partition.Actor, std::move(event)); + } + + tx.PartitionRepliesCount = 0; + tx.PartitionRepliesExpected = Partitions.size(); +} + +void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, + const TActorContext& ctx) +{ + EnsurePartitionsAreNotDeleted(config); + + Y_VERIFY(ConfigInited && PartitionsInited == Partitions.size()); + + if (!config.PartitionsSize()) { + for (const auto partitionId : config.GetPartitionIds()) { + config.AddPartitions()->SetPartitionId(partitionId); + } + } + + for (const auto& partition : config.GetPartitions()) { + const auto partitionId = partition.GetPartitionId(); + if (Partitions.contains(partitionId)) { + continue; + } + + TActorId actorId = ctx.Register(new TPartition(TabletID(), + partitionId, + ctx.SelfID, + CacheActor, + TopicConverter, + IsLocalDC, + DCId, + IsServerless, + config, + *Counters, + true)); + + Partitions.emplace(std::piecewise_construct, + std::forward_as_tuple(partitionId), + std::forward_as_tuple(actorId, + GetPartitionKeyRange(partition), + true, + *Counters)); + + ++PartitionsInited; + } +} + +void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const +{ + THashSet<ui32> was; + + if (config.PartitionsSize()) { + for (const auto& partition : config.GetPartitions()) { + was.insert(partition.GetPartitionId()); + } + } else { + for (const auto partitionId : config.GetPartitionIds()) { + was.insert(partitionId); + } + } + + for (const auto& partition : Config.GetPartitions()) { + Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId()); + } +} + ui64 TPersQueue::GetAllowedStep() const { // @@ -2979,6 +3172,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvTxProcessing::TEvReadSet, Handle); HFuncTraced(TEvTxProcessing::TEvReadSetAck, Handle); HFuncTraced(TEvPQ::TEvTxCalcPredicateResult, Handle); + HFuncTraced(TEvPQ::TEvProposePartitionConfigResult, Handle); HFuncTraced(TEvPQ::TEvTxCommitDone, Handle); default: return false; diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index da27f90271d..a1aed6b60a7 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -91,9 +91,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { //response from KV on READ or WRITE config request void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx); - void HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx); void HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx); void ApplyNewConfigAndReply(const TActorContext& ctx); + void ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, + const TActorContext& ctx); void HandleStateWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx); void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx); @@ -221,6 +222,8 @@ private: THashMap<ui64, NKikimrPQ::TTransaction::EState> WriteTxs; THashSet<ui64> DeleteTxs; THashSet<ui64> ChangedTxs; + TMaybe<NKikimrPQ::TPQTabletConfig> TabletConfigTx; + TMaybe<NKikimrPQ::TBootstrapConfig> BootstrapConfigTx; bool WriteTxsInProgress = false; TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies; @@ -238,6 +241,8 @@ private: NKikimrClient::TKeyValueRequest& request); void ProcessDeleteTxs(const TActorContext& ctx, NKikimrClient::TKeyValueRequest& request); + void ProcessConfigTx(const TActorContext& ctx, + TEvKeyValue::TEvRequest* request); void AddCmdWriteTabletTxInfo(NKikimrClient::TKeyValueRequest& request); void ScheduleProposeTransactionResult(const TDistributedTransaction& tx); @@ -276,6 +281,29 @@ private: void SendProposeTransactionAbort(const TActorId& target, ui64 txId, const TActorContext& ctx); + + void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx); + void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event, + const TActorContext& ctx); + void HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event, + const TActorContext& ctx); + + void SendEvProposePartitionConfig(const TActorContext& ctx, + TDistributedTransaction& tx); + + void CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, + const TActorContext& ctx); + void EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const; + + void BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg, + const NKikimrPQ::TBootstrapConfig& bootstrapCfg, + const TActorContext& ctx); + void EndWriteConfig(const NKikimrClient::TResponse& resp, + const TActorContext& ctx); + void AddCmdWriteConfig(TEvKeyValue::TEvRequest* request, + const NKikimrPQ::TPQTabletConfig& cfg, + const NKikimrPQ::TBootstrapConfig& bootstrapCfg, + const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index fcdb3a455f9..f6c9d0fa77e 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -5,15 +5,34 @@ namespace NKikimr::NPQ { void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event, ui64 minStep) { + Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET); Y_VERIFY(TxId == Max<ui64>()); - const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody(); - TxId = event.GetTxId(); MinStep = minStep; MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds(); + switch (event.GetTxBodyCase()) { + case NKikimrPQ::TEvProposeTransaction::kData: + Y_VERIFY(event.HasData()); + OnProposeTransaction(event.GetData()); + break; + case NKikimrPQ::TEvProposeTransaction::kConfig: + Y_VERIFY(event.HasConfig()); + OnProposeTransaction(event.GetConfig()); + break; + case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET: + break; + } + + Source = ActorIdFromProto(event.GetSource()); +} + +void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody) +{ + Kind = NKikimrPQ::TTransaction::KIND_DATA; + for (ui64 tablet : txBody.GetSendingShards()) { Senders.insert(tablet); } @@ -22,15 +41,36 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr Receivers.insert(tablet); } - Source = ActorIdFromProto(event.GetSource()); - for (auto& operation : txBody.GetOperations()) { Operations.push_back(operation); Partitions.insert(operation.GetPartitionId()); } PartitionRepliesCount = 0; - PartitionRepliesExpected = Partitions.size(); + PartitionRepliesExpected = 0; + + ReadSetCount = 0; +} + +void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody) +{ + Kind = NKikimrPQ::TTransaction::KIND_CONFIG; + + TabletConfig = txBody.GetTabletConfig(); + BootstrapConfig = txBody.GetBootstrapConfig(); + + if (TabletConfig.PartitionsSize()) { + for (const auto& partition : TabletConfig.GetPartitions()) { + Partitions.insert(partition.GetPartitionId()); + } + } else { + for (auto partitionId : TabletConfig.GetPartitionIds()) { + Partitions.insert(partitionId); + } + } + + PartitionRepliesCount = 0; + PartitionRepliesExpected = 0; ReadSetCount = 0; } @@ -55,6 +95,16 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred ++PartitionRepliesCount; } +void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event) +{ + Y_VERIFY(Step == event.Step); + Y_VERIFY(TxId == event.TxId); + + SetDecision(NKikimrTx::TReadSetData::DECISION_COMMIT); + + ++PartitionRepliesCount; +} + void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event, const TActorId& sender, std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack) @@ -139,10 +189,36 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque { NKikimrPQ::TTransaction tx; + tx.SetKind(Kind); + if (Step != Max<ui64>()) { + tx.SetStep(Step); + } tx.SetTxId(TxId); tx.SetState(state); tx.SetMinStep(MinStep); tx.SetMaxStep(MaxStep); + + switch (Kind) { + case NKikimrPQ::TTransaction::KIND_DATA: + AddCmdWriteDataTx(tx); + break; + case NKikimrPQ::TTransaction::KIND_CONFIG: + AddCmdWriteConfigTx(tx); + break; + case NKikimrPQ::TTransaction::KIND_UNKNOWN: + break; + } + + TString value; + Y_VERIFY(tx.SerializeToString(&value)); + + auto command = request.AddCmdWrite(); + command->SetKey(GetKey()); + command->SetValue(value); +} + +void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx) +{ for (ui64 tabletId : Senders) { tx.AddSenders(tabletId); } @@ -150,22 +226,18 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque tx.AddReceivers(tabletId); } tx.MutableOperations()->Add(Operations.begin(), Operations.end()); - if (Step != Max<ui64>()) { - tx.SetStep(Step); - } if (SelfDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) { tx.SetSelfPredicate(SelfDecision); } if (ParticipantsDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) { tx.SetAggrPredicate(ParticipantsDecision); } +} - TString value; - Y_VERIFY(tx.SerializeToString(&value)); - - auto command = request.AddCmdWrite(); - command->SetKey(GetKey()); - command->SetValue(value); +void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx) +{ + *tx.MutableTabletConfig() = TabletConfig; + *tx.MutableBootstrapConfig() = BootstrapConfig; } void TDistributedTransaction::AddCmdDelete(NKikimrClient::TKeyValueRequest& request) diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index ec36d6a4151..cbe3d787aaa 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -20,8 +20,11 @@ struct TDistributedTransaction { void OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event, ui64 minStep); + void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody); + void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody); void OnPlanStep(ui64 step); void OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event); + void OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event); void OnReadSet(const NKikimrTx::TEvReadSet& event, const TActorId& sender, std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack); @@ -31,6 +34,8 @@ struct TDistributedTransaction { using EDecision = NKikimrTx::TReadSetData::EDecision; using EState = NKikimrPQ::TTransaction::EState; + NKikimrPQ::TTransaction::EKind Kind = NKikimrPQ::TTransaction::KIND_UNKNOWN; + ui64 TxId = Max<ui64>(); ui64 Step = Max<ui64>(); EState State = NKikimrPQ::TTransaction::UNKNOWN; @@ -52,6 +57,9 @@ struct TDistributedTransaction { THashMap<NActors::TActorId, std::unique_ptr<TEvTxProcessing::TEvReadSetAck>> ReadSetAcks; + NKikimrPQ::TPQTabletConfig TabletConfig; + NKikimrPQ::TBootstrapConfig BootstrapConfig; + bool WriteInProgress = false; void SetDecision(EDecision decision); @@ -68,6 +76,9 @@ struct TDistributedTransaction { static void SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value); TString GetKey() const; + + void AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx); + void AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx); }; } diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt index fd9c7e5a3b0..4f836fa02f1 100644 --- a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt @@ -49,6 +49,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt index 1f8416b449f..24c085b88b8 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt @@ -51,6 +51,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt index a02afb44764..99cd37d063b 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt @@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp diff --git a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt index 4f0bfa21878..c05a36b5cdd 100644 --- a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt @@ -41,6 +41,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp new file mode 100644 index 00000000000..b2d16c63007 --- /dev/null +++ b/ydb/core/persqueue/ut/make_config.cpp @@ -0,0 +1,36 @@ +#include "make_config.h" + +namespace NKikimr::NPQ::NHelpers { + +NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, + const TVector<TCreateConsumerParams>& consumers, + ui32 partitionsCount) +{ + NKikimrPQ::TPQTabletConfig config; + + config.SetVersion(version); + + for (auto& c : consumers) { + config.AddReadRules(c.Consumer); + config.AddReadRuleGenerations(c.Generation); + } + + for (ui32 id = 0; id < partitionsCount; ++id) { + config.AddPartitionIds(id); + } + + config.SetTopicName("rt3.dc1--account--topic"); + config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); + config.SetFederationAccount("account"); + config.SetLocalDC(true); + config.SetYdbDatabasePath(""); + + return config; +} + +NKikimrPQ::TBootstrapConfig MakeBootstrapConfig() +{ + return {}; +} + +} diff --git a/ydb/core/persqueue/ut/make_config.h b/ydb/core/persqueue/ut/make_config.h new file mode 100644 index 00000000000..a4586667efb --- /dev/null +++ b/ydb/core/persqueue/ut/make_config.h @@ -0,0 +1,27 @@ +#pragma once + +#include <ydb/core/protos/pqconfig.pb.h> + +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/system/types.h> + +namespace NKikimr::NPQ::NHelpers { + +struct TCreateConsumerParams { + TString Consumer; + ui64 Offset = 0; + ui32 Generation = 0; + ui32 Step = 0; + TString Session; + ui64 OffsetRewindSum = 0; + ui64 ReadRuleGeneration = 0; +}; + +NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, + const TVector<TCreateConsumerParams>& consumers, + ui32 partitionsCount = 1); + +NKikimrPQ::TBootstrapConfig MakeBootstrapConfig(); + +} diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 41532380681..0d924642b9b 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -19,22 +19,12 @@ #include <util/generic/string.h> #include <util/system/types.h> -namespace NKikimr::NPQ { +#include "make_config.h" -Y_UNIT_TEST_SUITE(TPartitionTests) { +namespace NKikimr::NPQ { namespace NHelpers { -struct TCreateConsumerParams { - TString Consumer; - ui64 Offset = 0; - ui32 Generation = 0; - ui32 Step = 0; - TString Session; - ui64 OffsetRewindSum = 0; - ui64 ReadRuleGeneration = 0; -}; - struct TConfigParams { ui64 Version = 0; TVector<TCreateConsumerParams> Consumers; @@ -52,6 +42,8 @@ struct TCreatePartitionParams { } +Y_UNIT_TEST_SUITE(TPartitionTests) { + class TPartitionFixture : public NUnitTest::TBaseFixture { protected: struct TUserInfoMatcher { @@ -210,9 +202,6 @@ protected: ui64 begin, ui64 end, TMaybe<bool> predicate = Nothing()); - static NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version, - const TVector<TCreateConsumerParams>& consumers); - TMaybe<TTestContext> Ctx; TMaybe<TFinalizer> Finalizer; @@ -237,8 +226,6 @@ void TPartitionFixture::TearDown(NUnitTest::TTestContext&) { } - - void TPartitionFixture::CreatePartitionActor(ui32 id, const TConfigParams& config, bool newPartition, @@ -693,7 +680,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition, auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); ActorIdToProto(Ctx->Edge, event->Record.MutableSource()); - auto* body = event->Record.MutableTxBody(); + auto* body = event->Record.MutableData(); auto* operation = body->MutableOperations()->Add(); operation->SetPartitionId(partition); operation->SetBegin(begin); @@ -809,27 +796,6 @@ TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId, return TTransaction(event, predicate); } -NKikimrPQ::TPQTabletConfig TPartitionFixture::MakeConfig(ui64 version, - const TVector<TCreateConsumerParams>& consumers) -{ - NKikimrPQ::TPQTabletConfig config; - - config.SetVersion(version); - - for (auto& c : consumers) { - config.AddReadRules(c.Consumer); - config.AddReadRuleGenerations(c.Generation); - } - - config.SetTopicName("rt3.dc1--account--topic"); - config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic"); - config.SetFederationAccount("account"); - config.SetLocalDC(true); - config.SetYdbDatabasePath(""); - - return config; -} - Y_UNIT_TEST_F(Batching, TPartitionFixture) { CreatePartition(); diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 773ef094ae2..bcfa18cf72a 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -20,12 +20,11 @@ #include <util/generic/string.h> #include <util/system/types.h> +#include "make_config.h" #include "pqtablet_mock.h" namespace NKikimr::NPQ { -Y_UNIT_TEST_SUITE(TPQTabletTests) { - namespace NHelpers { struct TTxOperation { @@ -36,11 +35,17 @@ struct TTxOperation { TString Path; }; +struct TConfigParams { + TMaybe<NKikimrPQ::TPQTabletConfig> Tablet; + TMaybe<NKikimrPQ::TBootstrapConfig> Bootstrap; +}; + struct TProposeTransactionParams { ui64 TxId = 0; TVector<ui64> Senders; TVector<ui64> Receivers; TVector<TTxOperation> TxOps; + TMaybe<TConfigParams> Configs; }; struct TPlanStepParams { @@ -66,6 +71,8 @@ using TPQTabletMock = NKikimr::NPQ::NHelpers::TPQTabletMock; } +Y_UNIT_TEST_SUITE(TPQTabletTests) { + class TPQTabletFixture : public NUnitTest::TBaseFixture { protected: struct TProposeTransactionResponseMatcher { @@ -208,37 +215,43 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); THashSet<ui32> partitions; - // - // Source - // ActorIdToProto(Ctx->Edge, event->Record.MutableSource()); + event->Record.SetTxId(params.TxId); - // - // TxBody - // - auto* body = event->Record.MutableTxBody(); - for (auto& txOp : params.TxOps) { - auto* operation = body->MutableOperations()->Add(); - operation->SetPartitionId(txOp.Partition); - operation->SetBegin(txOp.Begin); - operation->SetEnd(txOp.End); - operation->SetConsumer(txOp.Consumer); - operation->SetPath(txOp.Path); - - partitions.insert(txOp.Partition); - } - for (ui64 tabletId : params.Senders) { - body->AddSendingShards(tabletId); - } - for (ui64 tabletId : params.Receivers) { - body->AddReceivingShards(tabletId); + if (params.Configs) { + // + // TxBody.Config + // + auto* body = event->Record.MutableConfig(); + if (params.Configs->Tablet.Defined()) { + *body->MutableTabletConfig() = *params.Configs->Tablet; + } + if (params.Configs->Bootstrap.Defined()) { + *body->MutableBootstrapConfig() = *params.Configs->Bootstrap; + } + } else { + // + // TxBody.Data + // + auto* body = event->Record.MutableData(); + for (auto& txOp : params.TxOps) { + auto* operation = body->MutableOperations()->Add(); + operation->SetPartitionId(txOp.Partition); + operation->SetBegin(txOp.Begin); + operation->SetEnd(txOp.End); + operation->SetConsumer(txOp.Consumer); + operation->SetPath(txOp.Path); + + partitions.insert(txOp.Partition); + } + for (ui64 tabletId : params.Senders) { + body->AddSendingShards(tabletId); + } + for (ui64 tabletId : params.Receivers) { + body->AddReceivingShards(tabletId); + } + body->SetImmediate(params.Senders.empty() && params.Receivers.empty() && (partitions.size() == 1)); } - body->SetImmediate(params.Senders.empty() && params.Receivers.empty() && (partitions.size() == 1)); - - // - // TxId - // - event->Record.SetTxId(params.TxId); SendToPipe(Ctx->Edge, event.Release()); @@ -731,6 +744,76 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture) .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); } +Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) +{ + PQTabletPrepare({.partitions=2}, {}, *Ctx); + + const ui64 txId = 67890; + + auto tabletConfig = + NHelpers::MakeConfig(2, { + {.Consumer="client-1", .Generation=0}, + {.Consumer="client-3", .Generation=7}}, + 2); + + SendProposeTransactionRequest({.TxId=txId, + .Configs=NHelpers::TConfigParams{ + .Tablet=tabletConfig, + .Bootstrap=NHelpers::MakeBootstrapConfig() + }}); + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=100, .TxIds={txId}}); + + WaitPlanStepAck({.Step=100, .TxIds={txId}}); + WaitPlanStepAccepted({.Step=100}); + + WaitProposeTransactionResponse({.TxId=txId, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); +} + +Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) +{ + PQTabletPrepare({.partitions=2}, {}, *Ctx); + + const ui64 txId_2 = 67891; + const ui64 txId_3 = 67892; + + auto tabletConfig = + NHelpers::MakeConfig(2, { + {.Consumer="client-1", .Generation=1}, + {.Consumer="client-2", .Generation=1} + }, + 3); + + SendProposeTransactionRequest({.TxId=txId_2, + .Configs=NHelpers::TConfigParams{ + .Tablet=tabletConfig, + .Bootstrap=NHelpers::MakeBootstrapConfig() + }}); + SendProposeTransactionRequest({.TxId=txId_3, + .TxOps={ + {.Partition=1, .Consumer="client-2", .Begin=0, .End=0, .Path="/topic"}, + {.Partition=2, .Consumer="client-1", .Begin=0, .End=0, .Path="/topic"} + }}); + + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + WaitProposeTransactionResponse({.TxId=txId_3, + .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); + + SendPlanStep({.Step=100, .TxIds={txId_2, txId_3}}); + + WaitPlanStepAck({.Step=100, .TxIds={txId_2, txId_3}}); + WaitPlanStepAccepted({.Step=100}); + + WaitProposeTransactionResponse({.TxId=txId_2, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); + WaitProposeTransactionResponse({.TxId=txId_3, + .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); +} + } } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index f218896b49f..16675e8d294 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -786,7 +786,7 @@ message TPartitionOperation { optional string Path = 5; // topic path }; -message TKqpTransaction { +message TDataTransaction { enum ELocksOp { Unspecified = 0; Validate = 1; @@ -802,10 +802,18 @@ message TKqpTransaction { optional fixed64 LockTxId = 6; } +message TConfigTransaction { + optional TPQTabletConfig TabletConfig = 2; + optional TBootstrapConfig BootstrapConfig = 3; +} + message TEvProposeTransaction { optional NActorsProto.TActorId Source = 1; - optional TKqpTransaction TxBody = 2; - optional uint64 TxId = 3; + optional uint64 TxId = 2; + oneof TxBody { + TDataTransaction Data = 3; + TConfigTransaction Config = 4; + } }; message TEvProposeTransactionResult { @@ -844,6 +852,12 @@ message TEvPeriodicTopicStats { }; message TTransaction { + enum EKind { + KIND_UNKNOWN = 0; + KIND_DATA = 1; + KIND_CONFIG = 2; + }; + enum EState { UNKNOWN = 0; PREPARING = 1; @@ -857,16 +871,27 @@ message TTransaction { EXECUTED = 9; }; + optional EKind Kind = 11; + optional uint64 Step = 8; optional uint64 TxId = 1; optional EState State = 2; optional uint64 MinStep = 3; optional uint64 MaxStep = 4; + + // + // TDataTransaction + // repeated uint64 Senders = 5; repeated uint64 Receivers = 6; repeated TPartitionOperation Operations = 7; - optional uint64 Step = 8; optional bool SelfPredicate = 9; // только предикаты партиций. предикаты коллег отдельно optional bool AggrPredicate = 10; // заполненено одно из полей Senders or AggrPredicate + + // + // TConfigTransaction + // + optional TPQTabletConfig TabletConfig = 12; + optional TBootstrapConfig BootstrapConfig = 13; }; message TTabletTxInfo { |