aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-03-09 12:20:44 +0300
committerabcdef <akotov@ydb.tech>2023-03-09 12:20:44 +0300
commit3cb6a9ccfacc08ffc3dc9247a97ac4f2f3d87347 (patch)
treec0f4945f7ca5a49148055152055392545d02bbbc
parentb576e5c308523206d523003d4429ce9832b6d9bf (diff)
downloadydb-3cb6a9ccfacc08ffc3dc9247a97ac4f2f3d87347.tar.gz
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp10
-rw-r--r--ydb/core/kqp/topics/kqp_topics.cpp4
-rw-r--r--ydb/core/kqp/topics/kqp_topics.h4
-rw-r--r--ydb/core/persqueue/events/internal.h28
-rw-r--r--ydb/core/persqueue/partition.cpp157
-rw-r--r--ydb/core/persqueue/partition.h36
-rw-r--r--ydb/core/persqueue/pq_impl.cpp310
-rw-r--r--ydb/core/persqueue/pq_impl.h30
-rw-r--r--ydb/core/persqueue/transaction.cpp100
-rw-r--r--ydb/core/persqueue/transaction.h11
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/persqueue/ut/make_config.cpp36
-rw-r--r--ydb/core/persqueue/ut/make_config.h27
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp44
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp143
-rw-r--r--ydb/core/protos/pqconfig.proto33
19 files changed, 785 insertions, 192 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 12936d3c857..4911802a765 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1951,7 +1951,7 @@ private:
}
using TDatashardTxs = THashMap<ui64, NKikimrTxDataShard::TKqpTransaction>;
- using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TKqpTransaction>;
+ using TTopicTabletTxs = THashMap<ui64, NKikimrPQ::TDataTransaction>;
void ContinueExecute() {
UseFollowers = Request.IsolationLevel == NKikimrKqp::ISOLATION_LEVEL_READ_STALE;
@@ -2115,13 +2115,13 @@ private:
for (auto& [_, tx] : topicTxs) {
switch (locksOp) {
case NKikimrTxDataShard::TKqpLocks::Commit:
- tx.SetOp(NKikimrPQ::TKqpTransaction::Commit);
+ tx.SetOp(NKikimrPQ::TDataTransaction::Commit);
break;
case NKikimrTxDataShard::TKqpLocks::Validate:
- tx.SetOp(NKikimrPQ::TKqpTransaction::Validate);
+ tx.SetOp(NKikimrPQ::TDataTransaction::Validate);
break;
case NKikimrTxDataShard::TKqpLocks::Rollback:
- tx.SetOp(NKikimrPQ::TKqpTransaction::Rollback);
+ tx.SetOp(NKikimrPQ::TDataTransaction::Rollback);
break;
case NKikimrTxDataShard::TKqpLocks::Unspecified:
break;
@@ -2303,7 +2303,7 @@ private:
transaction.SetImmediate(ImmediateTx);
ActorIdToProto(SelfId(), ev->Record.MutableSource());
- ev->Record.MutableTxBody()->Swap(&transaction);
+ ev->Record.MutableData()->Swap(&transaction);
ev->Record.SetTxId(TxId);
auto traceId = ExecuterSpan.GetTraceId();
diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp
index 6f9aba3419b..7afb30f2f93 100644
--- a/ydb/core/kqp/topics/kqp_topics.cpp
+++ b/ydb/core/kqp/topics/kqp_topics.cpp
@@ -89,7 +89,7 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
HasWriteOperations_ = true;
}
-void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs)
+void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
{
Y_VERIFY(TabletId_.Defined());
Y_VERIFY(Partition_.Defined());
@@ -326,7 +326,7 @@ bool TTopicOperations::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCac
return true;
}
-void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs)
+void TTopicOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs)
{
for (auto& [_, operations] : Operations_) {
operations.BuildTopicTxs(txs);
diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h
index 70219893655..e285b1b06d9 100644
--- a/ydb/core/kqp/topics/kqp_topics.h
+++ b/ydb/core/kqp/topics/kqp_topics.h
@@ -50,7 +50,7 @@ public:
const Ydb::Topic::OffsetsRange& range);
void AddOperation(const TString& topic, ui32 partition);
- void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs);
+ void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
void Merge(const TTopicPartitionOperations& rhs);
@@ -102,7 +102,7 @@ public:
Ydb::StatusIds_StatusCode& status,
TString& message);
- void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TKqpTransaction> &txs);
+ void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
void Merge(const TTopicOperations& rhs);
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 83cbd94bd59..18a8190f1de 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -120,6 +120,8 @@ struct TEvPQ {
EvMetering,
EvTxCalcPredicate,
EvTxCalcPredicateResult,
+ EvProposePartitionConfig,
+ EvProposePartitionConfigResult,
EvTxCommit,
EvTxCommitDone,
EvTxRollback,
@@ -710,6 +712,32 @@ struct TEvPQ {
bool Predicate = false;
};
+ struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {
+ TEvProposePartitionConfig(ui64 step, ui64 txId) :
+ Step(step),
+ TxId(txId)
+ {
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ NPersQueue::TTopicConverterPtr TopicConverter;
+ NKikimrPQ::TPQTabletConfig Config;
+ };
+
+ struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
+ TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) :
+ Step(step),
+ TxId(txId),
+ Partition(partition)
+ {
+ }
+
+ ui64 Step;
+ ui64 TxId;
+ ui32 Partition;
+ };
+
struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
TEvTxCommit(ui64 step, ui64 txId) :
Step(step),
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 19fd5a28d55..8a175527f51 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1877,7 +1877,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());
FillReadFromTimestamps(Config, ctx);
-
+ ResendPendingEvents(ctx);
ProcessTxsAndUserActs(ctx);
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
@@ -2354,7 +2354,9 @@ void TPartition::Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorCo
void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
- const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody();
+ Y_VERIFY(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
+ Y_VERIFY(event.HasData());
+ const NKikimrPQ::TDataTransaction& txBody = event.GetData();
if (!txBody.GetImmediate()) {
ReplyPropose(ctx,
@@ -2375,6 +2377,33 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
ProcessTxsAndUserActs(ctx);
}
+void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx)
+{
+ PushBackDistrTx(ev->Release());
+
+ ProcessTxsAndUserActs(ctx);
+}
+
+void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
+{
+ PendingEvents.emplace_back(ev->ReleaseBase().Release());
+}
+
+void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
+{
+ PendingEvents.emplace_back(ev->ReleaseBase().Release());
+}
+
+void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&)
+{
+ PendingEvents.emplace_back(ev->ReleaseBase().Release());
+}
+
+void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
+{
+ PendingEvents.emplace_back(ev->ReleaseBase().Release());
+}
+
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
{
PushBackDistrTx(ev->Release());
@@ -3679,7 +3708,9 @@ void TPartition::HandleSetOffsetResponse(ui64 cookie, const TActorContext& ctx)
Y_VERIFY(cookie == SET_OFFSET_COOKIE);
if (ChangeConfig) {
- EndChangePartitionConfig(*ChangeConfig, ctx);
+ EndChangePartitionConfig(ChangeConfig->Config,
+ ChangeConfig->TopicConverter,
+ ctx);
ChangeConfig = nullptr;
}
@@ -3752,6 +3783,11 @@ void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConf
DistrTxs.emplace_front(std::move(event), false);
}
+void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event)
+{
+ DistrTxs.emplace_back(std::move(event));
+}
+
void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
{
ImmediateTxs.push_back(std::move(tx));
@@ -3893,6 +3929,15 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
return predicate;
}
+bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
+{
+ ChangeConfig =
+ MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
+ event.Config);
+ SendChangeConfigReply = false;
+ return true;
+}
+
void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,
const TActorContext& ctx)
{
@@ -3908,23 +3953,34 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,
Y_VERIFY(!DistrTxs.empty());
TTransaction& t = DistrTxs.front();
- Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
- Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
+ if (t.Tx) {
+ Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
+ Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
- for (auto& operation : t.Tx->Operations) {
- TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx);
+ for (auto& operation : t.Tx->Operations) {
+ TUserInfo& userInfo = GetOrCreatePendingUser(operation.GetConsumer(), ctx);
- Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin());
+ Y_VERIFY(userInfo.Offset == (i64)operation.GetBegin());
- userInfo.Offset = operation.GetEnd();
- userInfo.Session = "";
- }
+ userInfo.Offset = operation.GetEnd();
+ userInfo.Session = "";
+ }
- PlanStep = t.Tx->Step;
- TxId = t.Tx->TxId;
- TxIdHasChanged = true;
+ ChangePlanStepAndTxId(t.Tx->Step, t.Tx->TxId);
- ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId);
+ ScheduleReplyCommitDone(t.Tx->Step, t.Tx->TxId);
+ } else if (t.ProposeConfig) {
+ Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.ProposeConfig));
+ Y_VERIFY(t.Predicate.Defined() && *t.Predicate);
+
+ BeginChangePartitionConfig(t.ProposeConfig->Config, ctx);
+
+ ChangePlanStepAndTxId(t.ProposeConfig->Step, t.ProposeConfig->TxId);
+
+ ScheduleReplyCommitDone(t.ProposeConfig->Step, t.ProposeConfig->TxId);
+ } else {
+ Y_VERIFY(t.ChangeConfig);
+ }
RemoveDistrTx();
}
@@ -3945,20 +4001,28 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,
Y_VERIFY(!DistrTxs.empty());
TTransaction& t = DistrTxs.front();
- Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
- Y_VERIFY(t.Predicate.Defined());
+ if (t.Tx) {
+ Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
+ Y_VERIFY(t.Predicate.Defined());
+
+ ChangePlanStepAndTxId(t.Tx->Step, t.Tx->TxId);
+ } else if (t.ProposeConfig) {
+ Y_VERIFY(GetStepAndTxId(event) == GetStepAndTxId(*t.ProposeConfig));
+ Y_VERIFY(t.Predicate.Defined());
+
+ ChangePlanStepAndTxId(t.ProposeConfig->Step, t.ProposeConfig->TxId);
+ } else {
+ Y_VERIFY(t.ChangeConfig);
+ }
- PlanStep = t.Tx->Step;
- TxId = t.Tx->TxId;
- TxIdHasChanged = true;
RemoveDistrTx();
}
-void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx)
{
- InitPendingUserInfoForImportantClients(event, ctx);
+ InitPendingUserInfoForImportantClients(config, ctx);
TSet<TString> hasReadRule;
@@ -3968,8 +4032,6 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi
hasReadRule.insert(consumer);
}
- const NKikimrPQ::TPQTabletConfig& config = event.Config;
-
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
auto& userInfo = GetOrCreatePendingUser(consumer, ctx, 0);
@@ -4020,15 +4082,16 @@ void TPartition::BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfi
ReportCounters(ctx);
}
-void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
+ NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx)
{
- Config = event.Config;
- TopicConverter = event.TopicConverter;
+ Config = config;
+ TopicConverter = topicConverter;
Y_VERIFY(Config.GetPartitionConfig().GetTotalPartitions() > 0);
- UsersInfoStorage->UpdateConfig(event.Config);
+ UsersInfoStorage->UpdateConfig(Config);
WriteQuota->UpdateConfig(Config.GetPartitionConfig().GetBurstSize(), Config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond());
if (AppData(ctx)->PQConfig.GetQuotingConfig().GetPartitionReadQuotaIsTwiceWriteQuota()) {
@@ -4048,8 +4111,8 @@ void TPartition::EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig&
if (Config.GetPartitionConfig().HasMirrorFrom()) {
if (Mirrorer) {
- ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(event.TopicConverter,
- event.Config));
+ ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
+ Config));
} else {
CreateMirrorerActor();
}
@@ -4071,9 +4134,23 @@ TString TPartition::GetKeyConfig() const
return Sprintf("_config_%u", Partition);
}
-void TPartition::InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
+void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
+{
+ PlanStep = step;
+ TxId = txId;
+ TxIdHasChanged = true;
+}
+
+void TPartition::ResendPendingEvents(const TActorContext& ctx)
+{
+ while (!PendingEvents.empty()) {
+ ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
+ PendingEvents.pop_front();
+ }
+}
+
+void TPartition::InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx) {
- const NKikimrPQ::TPQTabletConfig& config = event.Config;
TSet<TString> important;
for (const auto& consumer : config.GetPartitionConfig().GetImportantClientId()) {
@@ -4124,12 +4201,21 @@ void TPartition::ProcessDistrTx(const TActorContext& ctx)
*t.Predicate).Release());
TxInProgress = true;
+ } else if (t.ProposeConfig) {
+ t.Predicate = BeginTransaction(*t.ProposeConfig);
+
+ ctx.Send(Tablet,
+ MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
+ t.ProposeConfig->TxId,
+ Partition).Release());
+
+ TxInProgress = true;
} else {
Y_VERIFY(!ChangeConfig);
ChangeConfig = t.ChangeConfig;
SendChangeConfigReply = t.SendReply;
- BeginChangePartitionConfig(*ChangeConfig, ctx);
+ BeginChangePartitionConfig(ChangeConfig->Config, ctx);
RemoveDistrTx();
}
@@ -4149,7 +4235,10 @@ void TPartition::ProcessImmediateTxs(const TActorContext& ctx)
void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
const TActorContext& ctx)
{
- for (auto& operation : tx.GetTxBody().GetOperations()) {
+ Y_VERIFY(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
+ Y_VERIFY(tx.HasData());
+
+ for (auto& operation : tx.GetData().GetOperations()) {
Y_VERIFY(operation.HasBegin() && operation.HasEnd() && operation.HasConsumer());
Y_VERIFY(operation.GetBegin() <= (ui64)Max<i64>(), "Unexpected begin offset: %" PRIu64, operation.GetBegin());
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 6bde9bde361..181c4c22792 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -57,11 +57,19 @@ struct TTransaction {
Y_VERIFY(ChangeConfig);
}
+ explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> proposeConfig) :
+ ProposeConfig(proposeConfig)
+ {
+ Y_VERIFY(ProposeConfig);
+ }
+
TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx;
TMaybe<bool> Predicate;
TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
bool SendReply;
+
+ TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> ProposeConfig;
};
class TPartition : public TActorBootstrapped<TPartition> {
@@ -212,6 +220,7 @@ private:
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
+ void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event);
void RemoveDistrTx();
void ProcessDistrTxs(const TActorContext& ctx);
void ProcessDistrTx(const TActorContext& ctx);
@@ -274,18 +283,20 @@ private:
bool BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event,
const TActorContext& ctx);
+ bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event);
void EndTransaction(const TEvPQ::TEvTxCommit& event,
const TActorContext& ctx);
void EndTransaction(const TEvPQ::TEvTxRollback& event,
const TActorContext& ctx);
- void BeginChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+ void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
- void EndChangePartitionConfig(const TEvPQ::TEvChangePartitionConfig& event,
+ void EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
+ NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx);
TString GetKeyConfig() const;
- void InitPendingUserInfoForImportantClients(const TEvPQ::TEvChangePartitionConfig& event,
+ void InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config,
const TActorContext& ctx);
void RequestConfig(const TActorContext& ctx);
@@ -298,6 +309,17 @@ private:
}
void EmplaceResponse(TMessage&& message, const TActorContext& ctx);
+ void Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
+
+ void HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx);
+ void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
+ void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
+ void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
+
+ void ChangePlanStepAndTxId(ui64 step, ui64 txId);
+
+ void ResendPendingEvents(const TActorContext& ctx);
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -370,6 +392,10 @@ private:
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
HFuncTraced(NReadSpeedLimiterEvents::TEvCounters, Handle);
HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
+ HFuncTraced(TEvPQ::TEvTxCalcPredicate, HandleOnInit);
+ HFuncTraced(TEvPQ::TEvProposePartitionConfig, HandleOnInit);
+ HFuncTraced(TEvPQ::TEvTxCommit, HandleOnInit);
+ HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit);
default:
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
break;
@@ -418,6 +444,7 @@ private:
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle);
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
+ HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle);
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
@@ -470,6 +497,7 @@ private:
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnWrite);
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
+ HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle);
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
@@ -661,6 +689,8 @@ private:
THolder<TMirrorerInfo> Mirrorer;
TInstant LastUsedStorageMeterTimestamp;
+
+ TDeque<std::unique_ptr<IEventBase>> PendingEvents;
};
} // namespace NKikimr::NPQ
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index fc14f81d847..fa43b019169 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -609,7 +609,39 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
// in order to answer only after all parts are ready to work
Y_VERIFY(ConfigInited && PartitionsInited == Partitions.size());
- Config = NewConfig;
+ ApplyNewConfig(NewConfig, ctx);
+
+ for (auto& p : Partitions) { //change config for already created partitions
+ ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
+ }
+ ChangePartitionConfigInflight += Partitions.size();
+
+ for (const auto& partition : Config.GetPartitions()) {
+ const auto partitionId = partition.GetPartitionId();
+ if (Partitions.find(partitionId) == Partitions.end()) {
+ Partitions.emplace(partitionId, TPartitionInfo(
+ ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter,
+ IsLocalDC, DCId, IsServerless, Config, *Counters,
+ true)),
+ GetPartitionKeyRange(partition),
+ true,
+ *Counters
+ ));
+
+ // InitCompleted is true because this partition is empty
+ ++PartitionsInited; //newly created partition is empty and ready to work
+ }
+ }
+
+ if (!ChangePartitionConfigInflight) {
+ OnAllPartitionConfigChanged(ctx);
+ }
+}
+
+void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
+ const TActorContext& ctx)
+{
+ Config = newConfig;
if (!Config.PartitionsSize()) {
for (const auto partitionId : Config.GetPartitionIds()) {
@@ -618,8 +650,9 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
}
ui32 cacheSize = CACHE_SIZE;
- if (Config.HasCacheSize())
+ if (Config.HasCacheSize()) {
cacheSize = Config.GetCacheSize();
+ }
if (!TopicConverter) { // it's the first time
TopicName = Config.GetTopicName();
@@ -649,35 +682,9 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
}
InitializeMeteringSink(ctx);
-
- for (auto& p : Partitions) { //change config for already created partitions
- ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
- }
- ChangePartitionConfigInflight += Partitions.size();
-
- for (const auto& partition : Config.GetPartitions()) {
- const auto partitionId = partition.GetPartitionId();
- if (Partitions.find(partitionId) == Partitions.end()) {
- Partitions.emplace(partitionId, TPartitionInfo(
- ctx.Register(new TPartition(TabletID(), partitionId, ctx.SelfID, CacheActor, TopicConverter,
- IsLocalDC, DCId, IsServerless, Config, *Counters,
- true)),
- GetPartitionKeyRange(partition),
- true,
- *Counters
- ));
-
- // InitCompleted is true because this partition is empty
- ++PartitionsInited; //newly created partition is empty and ready to work
- }
- }
-
- if (!ChangePartitionConfigInflight) {
- OnAllPartitionConfigChanged(ctx);
- }
}
-void TPersQueue::HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
+void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
{
if (resp.GetStatus() != NMsgBusProxy::MSTATUS_OK ||
resp.WriteResultSize() < 1) {
@@ -954,7 +961,7 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
switch (resp.GetCookie()) {
case WRITE_CONFIG_COOKIE:
- HandleConfigWriteResponse(resp, ctx);
+ EndWriteConfig(resp, ctx);
break;
case READ_CONFIG_COOKIE:
// read is only for config - is signal to create interal actors
@@ -1323,15 +1330,39 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
<< " txId " << record.GetTxId() << " config:\n" << cfg.DebugString());
TString str;
-
Y_VERIFY(CheckPersQueueConfig(cfg, true, &str), "%s", str.c_str());
- bool res = cfg.SerializeToString(&str);
- Y_VERIFY(res);
+ BeginWriteConfig(cfg, bootstrapCfg, ctx);
+
+ NewConfig = cfg;
+}
+void TPersQueue::BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,
+ const NKikimrPQ::TBootstrapConfig& bootstrapCfg,
+ const TActorContext& ctx)
+{
TAutoPtr<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
request->Record.SetCookie(WRITE_CONFIG_COOKIE);
+ AddCmdWriteConfig(request.Get(),
+ cfg,
+ bootstrapCfg,
+ ctx);
+ Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1);
+
+ ctx.Send(ctx.SelfID, request.Release());
+}
+
+void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
+ const NKikimrPQ::TPQTabletConfig& cfg,
+ const NKikimrPQ::TBootstrapConfig& bootstrapCfg,
+ const TActorContext& ctx)
+{
+ Y_VERIFY(request);
+
+ TString str;
+ Y_VERIFY(cfg.SerializeToString(&str));
+
auto write = request->Record.AddCmdWrite();
write->SetKey(KeyConfig());
write->SetValue(str);
@@ -1348,16 +1379,10 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
}
for (const auto& partition : cfg.GetPartitions()) {
- sourceIdWriter.FillRequest(request.Get(), partition.GetPartitionId());
+ sourceIdWriter.FillRequest(request, partition.GetPartitionId());
}
-
- Y_VERIFY((ui64)request->Record.GetCmdWrite().size() == (ui64)bootstrapCfg.GetExplicitMessageGroups().size() * cfg.PartitionsSize() + 1);
-
- NewConfig = cfg;
- ctx.Send(ctx.SelfID, request.Release());
}
-
void TPersQueue::Handle(TEvPersQueue::TEvDropTablet::TPtr& ev, const TActorContext& ctx)
{
auto& record = ev->Get()->Record;
@@ -2246,10 +2271,32 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction. topicPath=" << TopicPath);
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction");
NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
- const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody();
+ switch (event.GetTxBodyCase()) {
+ case NKikimrPQ::TEvProposeTransaction::kData:
+ HandleDataTransaction(ev->Release(), ctx);
+ break;
+ case NKikimrPQ::TEvProposeTransaction::kConfig:
+ HandleConfigTransaction(ev->Release(), ctx);
+ break;
+ case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET:
+ SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()),
+ event.GetTxId(),
+ ctx);
+ break;
+ }
+
+}
+
+void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
+ const TActorContext& ctx)
+{
+ NKikimrPQ::TEvProposeTransaction& event = ev->Record;
+ Y_VERIFY(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
+ Y_VERIFY(event.HasData());
+ const NKikimrPQ::TDataTransaction& txBody = event.GetData();
for (auto& operation : txBody.GetOperations()) {
Y_VERIFY(!operation.HasPath() || (operation.GetPath() == TopicPath));
@@ -2287,17 +2334,26 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
auto i = Partitions.find(txBody.GetOperations(0).GetPartitionId());
Y_VERIFY(i != Partitions.end());
- //
- // FIXME(abcdef): последовательность вызовов Release
- //
- ctx.Send(i->second.Actor, ev->Release().Release());
+ ctx.Send(i->second.Actor, ev.Release());
} else {
- EvProposeTransactionQueue.emplace_back(ev->Release().Release());
+ EvProposeTransactionQueue.emplace_back(ev.Release());
TryWriteTxs(ctx);
}
}
+void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
+ const TActorContext& ctx)
+{
+ NKikimrPQ::TEvProposeTransaction& event = ev->Record;
+ Y_VERIFY(event.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kConfig);
+ Y_VERIFY(event.HasConfig());
+
+ EvProposeTransactionQueue.emplace_back(ev.Release());
+
+ TryWriteTxs(ctx);
+}
+
void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx)
{
NKikimrTx::TEvMediatorPlanStep& event = ev->Get()->Record;
@@ -2369,6 +2425,22 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
TryWriteTxs(ctx);
}
+void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx)
+{
+ const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get();
+
+ auto tx = GetTransaction(ctx, event.TxId);
+ if (!tx) {
+ return;
+ }
+
+ tx->OnProposePartitionConfigResult(event);
+
+ CheckTxState(ctx, *tx);
+
+ TryWriteTxs(ctx);
+}
+
void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& ctx)
{
const TEvPQ::TEvTxCommitDone& event = *ev->Get();
@@ -2401,6 +2473,7 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx)
ProcessWriteTxs(ctx, request->Record);
ProcessDeleteTxs(ctx, request->Record);
AddCmdWriteTabletTxInfo(request->Record);
+ ProcessConfigTx(ctx, request.Get());
WriteTxsInProgress = true;
@@ -2570,6 +2643,24 @@ void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx,
DeleteTxs.clear();
}
+void TPersQueue::ProcessConfigTx(const TActorContext& ctx,
+ TEvKeyValue::TEvRequest* request)
+{
+ Y_VERIFY(!WriteTxsInProgress);
+
+ if (!TabletConfigTx.Defined()) {
+ return;
+ }
+
+ AddCmdWriteConfig(request,
+ *TabletConfigTx,
+ *BootstrapConfigTx,
+ ctx);
+
+ TabletConfigTx = Nothing();
+ BootstrapConfigTx = Nothing();
+}
+
void TPersQueue::AddCmdWriteTabletTxInfo(NKikimrClient::TKeyValueRequest& request)
{
NKikimrPQ::TTabletTxInfo info;
@@ -2786,10 +2877,17 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::PLANNED:
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
- SendEvTxCalcPredicateToPartitions(ctx, tx);
-
- tx.PartitionRepliesCount = 0;
- tx.PartitionRepliesExpected = Partitions.size();
+ switch (tx.Kind) {
+ case NKikimrPQ::TTransaction::KIND_DATA:
+ SendEvTxCalcPredicateToPartitions(ctx, tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_CONFIG:
+ CreateNewPartitions(tx.TabletConfig, ctx);
+ SendEvProposePartitionConfig(ctx, tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_UNKNOWN:
+ Y_VERIFY(false);
+ }
tx.State = NKikimrPQ::TTransaction::CALCULATING;
}
@@ -2800,11 +2898,21 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_VERIFY(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);
if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
- SendEvReadSetToReceivers(ctx, tx);
+ switch (tx.Kind) {
+ case NKikimrPQ::TTransaction::KIND_DATA:
+ SendEvReadSetToReceivers(ctx, tx);
- WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS);
+ WriteTx(tx, NKikimrPQ::TTransaction::WAIT_RS);
- tx.State = NKikimrPQ::TTransaction::CALCULATED;
+ tx.State = NKikimrPQ::TTransaction::CALCULATED;
+ break;
+ case NKikimrPQ::TTransaction::KIND_CONFIG:
+ tx.State = NKikimrPQ::TTransaction::WAIT_RS;
+ CheckTxState(ctx, tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_UNKNOWN:
+ Y_VERIFY(false);
+ }
}
break;
@@ -2824,13 +2932,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
if (tx.HaveParticipantsDecision()) {
SendEvProposeTransactionResult(ctx, tx);
- tx.PartitionRepliesCount = 0;
if (tx.GetDecision() == NKikimrTx::TReadSetData::DECISION_COMMIT) {
SendEvTxCommitToPartitions(ctx, tx);
- tx.PartitionRepliesExpected = Partitions.size();
} else {
SendEvTxRollbackToPartitions(ctx, tx);
- tx.PartitionRepliesExpected = 0;
}
tx.State = NKikimrPQ::TTransaction::EXECUTING;
@@ -2847,7 +2952,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
Y_VERIFY(!TxQueue.empty());
Y_VERIFY(TxQueue.front().second == tx.TxId);
- SendEvReadSetAckToSenders(ctx, tx);
+ switch (tx.Kind) {
+ case NKikimrPQ::TTransaction::KIND_DATA:
+ SendEvReadSetAckToSenders(ctx, tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_CONFIG:
+ ApplyNewConfig(tx.TabletConfig, ctx);
+ TabletConfigTx = tx.TabletConfig;
+ BootstrapConfigTx = tx.BootstrapConfig;
+ break;
+ case NKikimrPQ::TTransaction::KIND_UNKNOWN:
+ Y_VERIFY(false);
+ }
tx.State = NKikimrPQ::TTransaction::EXECUTED;
@@ -2924,6 +3040,83 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
ctx.Send(target, std::move(event));
}
+void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
+ TDistributedTransaction& tx)
+{
+ for (auto& [_, partition] : Partitions) {
+ auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId);
+
+ event->TopicConverter = TopicConverter;
+ event->Config = tx.TabletConfig;
+
+ ctx.Send(partition.Actor, std::move(event));
+ }
+
+ tx.PartitionRepliesCount = 0;
+ tx.PartitionRepliesExpected = Partitions.size();
+}
+
+void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
+ const TActorContext& ctx)
+{
+ EnsurePartitionsAreNotDeleted(config);
+
+ Y_VERIFY(ConfigInited && PartitionsInited == Partitions.size());
+
+ if (!config.PartitionsSize()) {
+ for (const auto partitionId : config.GetPartitionIds()) {
+ config.AddPartitions()->SetPartitionId(partitionId);
+ }
+ }
+
+ for (const auto& partition : config.GetPartitions()) {
+ const auto partitionId = partition.GetPartitionId();
+ if (Partitions.contains(partitionId)) {
+ continue;
+ }
+
+ TActorId actorId = ctx.Register(new TPartition(TabletID(),
+ partitionId,
+ ctx.SelfID,
+ CacheActor,
+ TopicConverter,
+ IsLocalDC,
+ DCId,
+ IsServerless,
+ config,
+ *Counters,
+ true));
+
+ Partitions.emplace(std::piecewise_construct,
+ std::forward_as_tuple(partitionId),
+ std::forward_as_tuple(actorId,
+ GetPartitionKeyRange(partition),
+ true,
+ *Counters));
+
+ ++PartitionsInited;
+ }
+}
+
+void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const
+{
+ THashSet<ui32> was;
+
+ if (config.PartitionsSize()) {
+ for (const auto& partition : config.GetPartitions()) {
+ was.insert(partition.GetPartitionId());
+ }
+ } else {
+ for (const auto partitionId : config.GetPartitionIds()) {
+ was.insert(partitionId);
+ }
+ }
+
+ for (const auto& partition : Config.GetPartitions()) {
+ Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId());
+ }
+}
+
ui64 TPersQueue::GetAllowedStep() const
{
//
@@ -2979,6 +3172,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
HFuncTraced(TEvTxProcessing::TEvReadSet, Handle);
HFuncTraced(TEvTxProcessing::TEvReadSetAck, Handle);
HFuncTraced(TEvPQ::TEvTxCalcPredicateResult, Handle);
+ HFuncTraced(TEvPQ::TEvProposePartitionConfigResult, Handle);
HFuncTraced(TEvPQ::TEvTxCommitDone, Handle);
default:
return false;
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index da27f90271d..a1aed6b60a7 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -91,9 +91,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
//response from KV on READ or WRITE config request
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
- void HandleConfigWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx);
void HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx);
void ApplyNewConfigAndReply(const TActorContext& ctx);
+ void ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
+ const TActorContext& ctx);
void HandleStateWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx);
void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx);
@@ -221,6 +222,8 @@ private:
THashMap<ui64, NKikimrPQ::TTransaction::EState> WriteTxs;
THashSet<ui64> DeleteTxs;
THashSet<ui64> ChangedTxs;
+ TMaybe<NKikimrPQ::TPQTabletConfig> TabletConfigTx;
+ TMaybe<NKikimrPQ::TBootstrapConfig> BootstrapConfigTx;
bool WriteTxsInProgress = false;
TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
@@ -238,6 +241,8 @@ private:
NKikimrClient::TKeyValueRequest& request);
void ProcessDeleteTxs(const TActorContext& ctx,
NKikimrClient::TKeyValueRequest& request);
+ void ProcessConfigTx(const TActorContext& ctx,
+ TEvKeyValue::TEvRequest* request);
void AddCmdWriteTabletTxInfo(NKikimrClient::TKeyValueRequest& request);
void ScheduleProposeTransactionResult(const TDistributedTransaction& tx);
@@ -276,6 +281,29 @@ private:
void SendProposeTransactionAbort(const TActorId& target,
ui64 txId,
const TActorContext& ctx);
+
+ void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);
+ void HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
+ const TActorContext& ctx);
+ void HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> event,
+ const TActorContext& ctx);
+
+ void SendEvProposePartitionConfig(const TActorContext& ctx,
+ TDistributedTransaction& tx);
+
+ void CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
+ const TActorContext& ctx);
+ void EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const;
+
+ void BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,
+ const NKikimrPQ::TBootstrapConfig& bootstrapCfg,
+ const TActorContext& ctx);
+ void EndWriteConfig(const NKikimrClient::TResponse& resp,
+ const TActorContext& ctx);
+ void AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
+ const NKikimrPQ::TPQTabletConfig& cfg,
+ const NKikimrPQ::TBootstrapConfig& bootstrapCfg,
+ const TActorContext& ctx);
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index fcdb3a455f9..f6c9d0fa77e 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -5,15 +5,34 @@ namespace NKikimr::NPQ {
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event,
ui64 minStep)
{
+ Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
Y_VERIFY(TxId == Max<ui64>());
- const NKikimrPQ::TKqpTransaction& txBody = event.GetTxBody();
-
TxId = event.GetTxId();
MinStep = minStep;
MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds();
+ switch (event.GetTxBodyCase()) {
+ case NKikimrPQ::TEvProposeTransaction::kData:
+ Y_VERIFY(event.HasData());
+ OnProposeTransaction(event.GetData());
+ break;
+ case NKikimrPQ::TEvProposeTransaction::kConfig:
+ Y_VERIFY(event.HasConfig());
+ OnProposeTransaction(event.GetConfig());
+ break;
+ case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET:
+ break;
+ }
+
+ Source = ActorIdFromProto(event.GetSource());
+}
+
+void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody)
+{
+ Kind = NKikimrPQ::TTransaction::KIND_DATA;
+
for (ui64 tablet : txBody.GetSendingShards()) {
Senders.insert(tablet);
}
@@ -22,15 +41,36 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr
Receivers.insert(tablet);
}
- Source = ActorIdFromProto(event.GetSource());
-
for (auto& operation : txBody.GetOperations()) {
Operations.push_back(operation);
Partitions.insert(operation.GetPartitionId());
}
PartitionRepliesCount = 0;
- PartitionRepliesExpected = Partitions.size();
+ PartitionRepliesExpected = 0;
+
+ ReadSetCount = 0;
+}
+
+void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody)
+{
+ Kind = NKikimrPQ::TTransaction::KIND_CONFIG;
+
+ TabletConfig = txBody.GetTabletConfig();
+ BootstrapConfig = txBody.GetBootstrapConfig();
+
+ if (TabletConfig.PartitionsSize()) {
+ for (const auto& partition : TabletConfig.GetPartitions()) {
+ Partitions.insert(partition.GetPartitionId());
+ }
+ } else {
+ for (auto partitionId : TabletConfig.GetPartitionIds()) {
+ Partitions.insert(partitionId);
+ }
+ }
+
+ PartitionRepliesCount = 0;
+ PartitionRepliesExpected = 0;
ReadSetCount = 0;
}
@@ -55,6 +95,16 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred
++PartitionRepliesCount;
}
+void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
+{
+ Y_VERIFY(Step == event.Step);
+ Y_VERIFY(TxId == event.TxId);
+
+ SetDecision(NKikimrTx::TReadSetData::DECISION_COMMIT);
+
+ ++PartitionRepliesCount;
+}
+
void TDistributedTransaction::OnReadSet(const NKikimrTx::TEvReadSet& event,
const TActorId& sender,
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack)
@@ -139,10 +189,36 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
{
NKikimrPQ::TTransaction tx;
+ tx.SetKind(Kind);
+ if (Step != Max<ui64>()) {
+ tx.SetStep(Step);
+ }
tx.SetTxId(TxId);
tx.SetState(state);
tx.SetMinStep(MinStep);
tx.SetMaxStep(MaxStep);
+
+ switch (Kind) {
+ case NKikimrPQ::TTransaction::KIND_DATA:
+ AddCmdWriteDataTx(tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_CONFIG:
+ AddCmdWriteConfigTx(tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_UNKNOWN:
+ break;
+ }
+
+ TString value;
+ Y_VERIFY(tx.SerializeToString(&value));
+
+ auto command = request.AddCmdWrite();
+ command->SetKey(GetKey());
+ command->SetValue(value);
+}
+
+void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx)
+{
for (ui64 tabletId : Senders) {
tx.AddSenders(tabletId);
}
@@ -150,22 +226,18 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
tx.AddReceivers(tabletId);
}
tx.MutableOperations()->Add(Operations.begin(), Operations.end());
- if (Step != Max<ui64>()) {
- tx.SetStep(Step);
- }
if (SelfDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) {
tx.SetSelfPredicate(SelfDecision);
}
if (ParticipantsDecision != NKikimrTx::TReadSetData::DECISION_UNKNOWN) {
tx.SetAggrPredicate(ParticipantsDecision);
}
+}
- TString value;
- Y_VERIFY(tx.SerializeToString(&value));
-
- auto command = request.AddCmdWrite();
- command->SetKey(GetKey());
- command->SetValue(value);
+void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx)
+{
+ *tx.MutableTabletConfig() = TabletConfig;
+ *tx.MutableBootstrapConfig() = BootstrapConfig;
}
void TDistributedTransaction::AddCmdDelete(NKikimrClient::TKeyValueRequest& request)
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index ec36d6a4151..cbe3d787aaa 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -20,8 +20,11 @@ struct TDistributedTransaction {
void OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event,
ui64 minStep);
+ void OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody);
+ void OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody);
void OnPlanStep(ui64 step);
void OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event);
+ void OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event);
void OnReadSet(const NKikimrTx::TEvReadSet& event,
const TActorId& sender,
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack);
@@ -31,6 +34,8 @@ struct TDistributedTransaction {
using EDecision = NKikimrTx::TReadSetData::EDecision;
using EState = NKikimrPQ::TTransaction::EState;
+ NKikimrPQ::TTransaction::EKind Kind = NKikimrPQ::TTransaction::KIND_UNKNOWN;
+
ui64 TxId = Max<ui64>();
ui64 Step = Max<ui64>();
EState State = NKikimrPQ::TTransaction::UNKNOWN;
@@ -52,6 +57,9 @@ struct TDistributedTransaction {
THashMap<NActors::TActorId, std::unique_ptr<TEvTxProcessing::TEvReadSetAck>> ReadSetAcks;
+ NKikimrPQ::TPQTabletConfig TabletConfig;
+ NKikimrPQ::TBootstrapConfig BootstrapConfig;
+
bool WriteInProgress = false;
void SetDecision(EDecision decision);
@@ -68,6 +76,9 @@ struct TDistributedTransaction {
static void SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value);
TString GetKey() const;
+
+ void AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx);
+ void AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx);
};
}
diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
index fd9c7e5a3b0..4f836fa02f1 100644
--- a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
@@ -49,6 +49,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
index 1f8416b449f..24c085b88b8 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
@@ -51,6 +51,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
index a02afb44764..99cd37d063b 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
@@ -53,6 +53,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
diff --git a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
index 4f0bfa21878..c05a36b5cdd 100644
--- a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
@@ -41,6 +41,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/counters_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqtablet_mock.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/internals_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/make_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/metering_sink_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/mirrorer_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pq_ut.cpp
diff --git a/ydb/core/persqueue/ut/make_config.cpp b/ydb/core/persqueue/ut/make_config.cpp
new file mode 100644
index 00000000000..b2d16c63007
--- /dev/null
+++ b/ydb/core/persqueue/ut/make_config.cpp
@@ -0,0 +1,36 @@
+#include "make_config.h"
+
+namespace NKikimr::NPQ::NHelpers {
+
+NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
+ const TVector<TCreateConsumerParams>& consumers,
+ ui32 partitionsCount)
+{
+ NKikimrPQ::TPQTabletConfig config;
+
+ config.SetVersion(version);
+
+ for (auto& c : consumers) {
+ config.AddReadRules(c.Consumer);
+ config.AddReadRuleGenerations(c.Generation);
+ }
+
+ for (ui32 id = 0; id < partitionsCount; ++id) {
+ config.AddPartitionIds(id);
+ }
+
+ config.SetTopicName("rt3.dc1--account--topic");
+ config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
+ config.SetFederationAccount("account");
+ config.SetLocalDC(true);
+ config.SetYdbDatabasePath("");
+
+ return config;
+}
+
+NKikimrPQ::TBootstrapConfig MakeBootstrapConfig()
+{
+ return {};
+}
+
+}
diff --git a/ydb/core/persqueue/ut/make_config.h b/ydb/core/persqueue/ut/make_config.h
new file mode 100644
index 00000000000..a4586667efb
--- /dev/null
+++ b/ydb/core/persqueue/ut/make_config.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <ydb/core/protos/pqconfig.pb.h>
+
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/system/types.h>
+
+namespace NKikimr::NPQ::NHelpers {
+
+struct TCreateConsumerParams {
+ TString Consumer;
+ ui64 Offset = 0;
+ ui32 Generation = 0;
+ ui32 Step = 0;
+ TString Session;
+ ui64 OffsetRewindSum = 0;
+ ui64 ReadRuleGeneration = 0;
+};
+
+NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
+ const TVector<TCreateConsumerParams>& consumers,
+ ui32 partitionsCount = 1);
+
+NKikimrPQ::TBootstrapConfig MakeBootstrapConfig();
+
+}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 41532380681..0d924642b9b 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -19,22 +19,12 @@
#include <util/generic/string.h>
#include <util/system/types.h>
-namespace NKikimr::NPQ {
+#include "make_config.h"
-Y_UNIT_TEST_SUITE(TPartitionTests) {
+namespace NKikimr::NPQ {
namespace NHelpers {
-struct TCreateConsumerParams {
- TString Consumer;
- ui64 Offset = 0;
- ui32 Generation = 0;
- ui32 Step = 0;
- TString Session;
- ui64 OffsetRewindSum = 0;
- ui64 ReadRuleGeneration = 0;
-};
-
struct TConfigParams {
ui64 Version = 0;
TVector<TCreateConsumerParams> Consumers;
@@ -52,6 +42,8 @@ struct TCreatePartitionParams {
}
+Y_UNIT_TEST_SUITE(TPartitionTests) {
+
class TPartitionFixture : public NUnitTest::TBaseFixture {
protected:
struct TUserInfoMatcher {
@@ -210,9 +202,6 @@ protected:
ui64 begin, ui64 end,
TMaybe<bool> predicate = Nothing());
- static NKikimrPQ::TPQTabletConfig MakeConfig(ui64 version,
- const TVector<TCreateConsumerParams>& consumers);
-
TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;
@@ -237,8 +226,6 @@ void TPartitionFixture::TearDown(NUnitTest::TTestContext&)
{
}
-
-
void TPartitionFixture::CreatePartitionActor(ui32 id,
const TConfigParams& config,
bool newPartition,
@@ -693,7 +680,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
- auto* body = event->Record.MutableTxBody();
+ auto* body = event->Record.MutableData();
auto* operation = body->MutableOperations()->Add();
operation->SetPartitionId(partition);
operation->SetBegin(begin);
@@ -809,27 +796,6 @@ TTransaction TPartitionFixture::MakeTransaction(ui64 step, ui64 txId,
return TTransaction(event, predicate);
}
-NKikimrPQ::TPQTabletConfig TPartitionFixture::MakeConfig(ui64 version,
- const TVector<TCreateConsumerParams>& consumers)
-{
- NKikimrPQ::TPQTabletConfig config;
-
- config.SetVersion(version);
-
- for (auto& c : consumers) {
- config.AddReadRules(c.Consumer);
- config.AddReadRuleGenerations(c.Generation);
- }
-
- config.SetTopicName("rt3.dc1--account--topic");
- config.SetTopicPath("/Root/PQ/rt3.dc1--account--topic");
- config.SetFederationAccount("account");
- config.SetLocalDC(true);
- config.SetYdbDatabasePath("");
-
- return config;
-}
-
Y_UNIT_TEST_F(Batching, TPartitionFixture)
{
CreatePartition();
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index 773ef094ae2..bcfa18cf72a 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -20,12 +20,11 @@
#include <util/generic/string.h>
#include <util/system/types.h>
+#include "make_config.h"
#include "pqtablet_mock.h"
namespace NKikimr::NPQ {
-Y_UNIT_TEST_SUITE(TPQTabletTests) {
-
namespace NHelpers {
struct TTxOperation {
@@ -36,11 +35,17 @@ struct TTxOperation {
TString Path;
};
+struct TConfigParams {
+ TMaybe<NKikimrPQ::TPQTabletConfig> Tablet;
+ TMaybe<NKikimrPQ::TBootstrapConfig> Bootstrap;
+};
+
struct TProposeTransactionParams {
ui64 TxId = 0;
TVector<ui64> Senders;
TVector<ui64> Receivers;
TVector<TTxOperation> TxOps;
+ TMaybe<TConfigParams> Configs;
};
struct TPlanStepParams {
@@ -66,6 +71,8 @@ using TPQTabletMock = NKikimr::NPQ::NHelpers::TPQTabletMock;
}
+Y_UNIT_TEST_SUITE(TPQTabletTests) {
+
class TPQTabletFixture : public NUnitTest::TBaseFixture {
protected:
struct TProposeTransactionResponseMatcher {
@@ -208,37 +215,43 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
THashSet<ui32> partitions;
- //
- // Source
- //
ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
+ event->Record.SetTxId(params.TxId);
- //
- // TxBody
- //
- auto* body = event->Record.MutableTxBody();
- for (auto& txOp : params.TxOps) {
- auto* operation = body->MutableOperations()->Add();
- operation->SetPartitionId(txOp.Partition);
- operation->SetBegin(txOp.Begin);
- operation->SetEnd(txOp.End);
- operation->SetConsumer(txOp.Consumer);
- operation->SetPath(txOp.Path);
-
- partitions.insert(txOp.Partition);
- }
- for (ui64 tabletId : params.Senders) {
- body->AddSendingShards(tabletId);
- }
- for (ui64 tabletId : params.Receivers) {
- body->AddReceivingShards(tabletId);
+ if (params.Configs) {
+ //
+ // TxBody.Config
+ //
+ auto* body = event->Record.MutableConfig();
+ if (params.Configs->Tablet.Defined()) {
+ *body->MutableTabletConfig() = *params.Configs->Tablet;
+ }
+ if (params.Configs->Bootstrap.Defined()) {
+ *body->MutableBootstrapConfig() = *params.Configs->Bootstrap;
+ }
+ } else {
+ //
+ // TxBody.Data
+ //
+ auto* body = event->Record.MutableData();
+ for (auto& txOp : params.TxOps) {
+ auto* operation = body->MutableOperations()->Add();
+ operation->SetPartitionId(txOp.Partition);
+ operation->SetBegin(txOp.Begin);
+ operation->SetEnd(txOp.End);
+ operation->SetConsumer(txOp.Consumer);
+ operation->SetPath(txOp.Path);
+
+ partitions.insert(txOp.Partition);
+ }
+ for (ui64 tabletId : params.Senders) {
+ body->AddSendingShards(tabletId);
+ }
+ for (ui64 tabletId : params.Receivers) {
+ body->AddReceivingShards(tabletId);
+ }
+ body->SetImmediate(params.Senders.empty() && params.Receivers.empty() && (partitions.size() == 1));
}
- body->SetImmediate(params.Senders.empty() && params.Receivers.empty() && (partitions.size() == 1));
-
- //
- // TxId
- //
- event->Record.SetTxId(params.TxId);
SendToPipe(Ctx->Edge,
event.Release());
@@ -731,6 +744,76 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture)
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
}
+Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
+{
+ PQTabletPrepare({.partitions=2}, {}, *Ctx);
+
+ const ui64 txId = 67890;
+
+ auto tabletConfig =
+ NHelpers::MakeConfig(2, {
+ {.Consumer="client-1", .Generation=0},
+ {.Consumer="client-3", .Generation=7}},
+ 2);
+
+ SendProposeTransactionRequest({.TxId=txId,
+ .Configs=NHelpers::TConfigParams{
+ .Tablet=tabletConfig,
+ .Bootstrap=NHelpers::MakeBootstrapConfig()
+ }});
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendPlanStep({.Step=100, .TxIds={txId}});
+
+ WaitPlanStepAck({.Step=100, .TxIds={txId}});
+ WaitPlanStepAccepted({.Step=100});
+
+ WaitProposeTransactionResponse({.TxId=txId,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+}
+
+Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
+{
+ PQTabletPrepare({.partitions=2}, {}, *Ctx);
+
+ const ui64 txId_2 = 67891;
+ const ui64 txId_3 = 67892;
+
+ auto tabletConfig =
+ NHelpers::MakeConfig(2, {
+ {.Consumer="client-1", .Generation=1},
+ {.Consumer="client-2", .Generation=1}
+ },
+ 3);
+
+ SendProposeTransactionRequest({.TxId=txId_2,
+ .Configs=NHelpers::TConfigParams{
+ .Tablet=tabletConfig,
+ .Bootstrap=NHelpers::MakeBootstrapConfig()
+ }});
+ SendProposeTransactionRequest({.TxId=txId_3,
+ .TxOps={
+ {.Partition=1, .Consumer="client-2", .Begin=0, .End=0, .Path="/topic"},
+ {.Partition=2, .Consumer="client-1", .Begin=0, .End=0, .Path="/topic"}
+ }});
+
+ WaitProposeTransactionResponse({.TxId=txId_2,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+ WaitProposeTransactionResponse({.TxId=txId_3,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
+
+ SendPlanStep({.Step=100, .TxIds={txId_2, txId_3}});
+
+ WaitPlanStepAck({.Step=100, .TxIds={txId_2, txId_3}});
+ WaitPlanStepAccepted({.Step=100});
+
+ WaitProposeTransactionResponse({.TxId=txId_2,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+ WaitProposeTransactionResponse({.TxId=txId_3,
+ .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
+}
+
}
}
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index f218896b49f..16675e8d294 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -786,7 +786,7 @@ message TPartitionOperation {
optional string Path = 5; // topic path
};
-message TKqpTransaction {
+message TDataTransaction {
enum ELocksOp {
Unspecified = 0;
Validate = 1;
@@ -802,10 +802,18 @@ message TKqpTransaction {
optional fixed64 LockTxId = 6;
}
+message TConfigTransaction {
+ optional TPQTabletConfig TabletConfig = 2;
+ optional TBootstrapConfig BootstrapConfig = 3;
+}
+
message TEvProposeTransaction {
optional NActorsProto.TActorId Source = 1;
- optional TKqpTransaction TxBody = 2;
- optional uint64 TxId = 3;
+ optional uint64 TxId = 2;
+ oneof TxBody {
+ TDataTransaction Data = 3;
+ TConfigTransaction Config = 4;
+ }
};
message TEvProposeTransactionResult {
@@ -844,6 +852,12 @@ message TEvPeriodicTopicStats {
};
message TTransaction {
+ enum EKind {
+ KIND_UNKNOWN = 0;
+ KIND_DATA = 1;
+ KIND_CONFIG = 2;
+ };
+
enum EState {
UNKNOWN = 0;
PREPARING = 1;
@@ -857,16 +871,27 @@ message TTransaction {
EXECUTED = 9;
};
+ optional EKind Kind = 11;
+ optional uint64 Step = 8;
optional uint64 TxId = 1;
optional EState State = 2;
optional uint64 MinStep = 3;
optional uint64 MaxStep = 4;
+
+ //
+ // TDataTransaction
+ //
repeated uint64 Senders = 5;
repeated uint64 Receivers = 6;
repeated TPartitionOperation Operations = 7;
- optional uint64 Step = 8;
optional bool SelfPredicate = 9; // только предикаты партиций. предикаты коллег отдельно
optional bool AggrPredicate = 10; // заполненено одно из полей Senders or AggrPredicate
+
+ //
+ // TConfigTransaction
+ //
+ optional TPQTabletConfig TabletConfig = 12;
+ optional TBootstrapConfig BootstrapConfig = 13;
};
message TTabletTxInfo {