aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-04-18 15:11:22 +0300
committerabcdef <akotov@ydb.tech>2023-04-18 15:11:22 +0300
commit000d5a64f4c4012c2bb357d59647c681a80f6003 (patch)
tree02dff7b3a502fdfc209e4c90a4ba837697290e10
parentc30a4071eb92a8dfc20a8da12b4f0652b284a926 (diff)
downloadydb-000d5a64f4c4012c2bb357d59647c681a80f6003.tar.gz
distributed transactions for updating the PQ config
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/persqueue/key.h5
-rw-r--r--ydb/core/persqueue/partition.cpp6
-rw-r--r--ydb/core/persqueue/partition_init.cpp2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp438
-rw-r--r--ydb/core/persqueue/pq_impl.h43
-rw-r--r--ydb/core/persqueue/transaction.cpp191
-rw-r--r--ydb/core/persqueue/transaction.h31
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp2
-rw-r--r--ydb/core/persqueue/ut/pqtablet_ut.cpp20
-rw-r--r--ydb/core/protos/pqconfig.proto15
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h12
-rw-r--r--ydb/core/tx/schemeshard/ut_pq_reboots.cpp32
14 files changed, 680 insertions, 136 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index e3b1f758207..658142e9f90 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2246,7 +2246,7 @@ private:
}
transaction.SetImmediate(ImmediateTx);
- ActorIdToProto(SelfId(), ev->Record.MutableSource());
+ ActorIdToProto(SelfId(), ev->Record.MutableActor());
ev->Record.MutableData()->Swap(&transaction);
ev->Record.SetTxId(TxId);
diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h
index 760a5ccbcec..8d932b82a0a 100644
--- a/ydb/core/persqueue/key.h
+++ b/ydb/core/persqueue/key.h
@@ -246,6 +246,11 @@ private:
ui16 InternalPartsCount;
};
+inline
+TString GetTxKey(ui64 txId)
+{
+ return Sprintf("tx_%" PRIu64, txId);
+}
}// NPQ
}// NKikimr
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 7c894068c13..a16d9eb6152 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -184,7 +184,7 @@ void TPartition::ReplyPropose(const TActorContext& ctx,
const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode)
{
- ctx.Send(ActorIdFromProto(event.GetSource()),
+ ctx.Send(ActorIdFromProto(event.GetActor()),
MakeReplyPropose(event, statusCode).Release());
}
@@ -2722,7 +2722,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,
const TActorContext& ctx)
{
if (PlanStep.Defined() && TxId.Defined()) {
- if (GetStepAndTxId(event) <= GetStepAndTxId(*PlanStep, *TxId)) {
+ if (GetStepAndTxId(event) < GetStepAndTxId(*PlanStep, *TxId)) {
ctx.Send(Tablet, MakeCommitDone(event.Step, event.TxId).Release());
return;
}
@@ -3246,7 +3246,7 @@ void TPartition::ScheduleReplyError(const ui64 dst,
void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode)
{
- Replies.emplace_back(ActorIdFromProto(event.GetSource()),
+ Replies.emplace_back(ActorIdFromProto(event.GetActor()),
MakeReplyPropose(event,
statusCode).Release());
}
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index 185f9f18134..1b60d97841f 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -161,8 +161,6 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
switch (response.GetStatus()) {
case NKikimrProto::OK:
Y_VERIFY(Partition()->Config.ParseFromString(response.GetValue()));
- Y_VERIFY(Partition()->Config.GetVersion() <= Partition()->TabletConfig.GetVersion());
-
if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
Partition()->TabletConfig);
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index f8f8559d99e..228d5077d9c 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -29,6 +29,8 @@ static constexpr ui32 CACHE_SIZE = 100_MB;
static constexpr ui32 MAX_BYTES = 25_MB;
static constexpr ui32 MAX_SOURCE_ID_LENGTH = 10_KB;
+constexpr const auto InvalidTabletId = Max<ui64>();
+
struct TPartitionInfo {
TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange,
const bool initDone, const TTabletCountersBase& baseline)
@@ -584,6 +586,34 @@ private:
/******************************************************* TPersQueue *********************************************************/
+struct TPersQueue::TReplyToActor {
+ using TEventBasePtr = std::unique_ptr<IEventBase>;
+
+ TReplyToActor(const TActorId& actorId, TEventBasePtr event) :
+ ActorId(actorId),
+ Event(std::move(event))
+ {
+ }
+
+ TActorId ActorId;
+ TEventBasePtr Event;
+};
+
+struct TPersQueue::TReplyToPipe {
+ using TEventBasePtr = std::unique_ptr<IEventBase>;
+
+ TReplyToPipe(ui64 tabletId, ui64 txId, TEventBasePtr event) :
+ TabletId(tabletId),
+ TxId(txId),
+ Event(std::move(event))
+ {
+ }
+
+ ui64 TabletId;
+ ui64 TxId;
+ TEventBasePtr Event;
+};
+
void TPersQueue::ReplyError(const TActorContext& ctx, const ui64 responseCookie, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error)
{
ReplyPersQueueError(
@@ -623,7 +653,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
const auto partitionId = partition.GetPartitionId();
if (Partitions.find(partitionId) == Partitions.end()) {
Partitions.emplace(partitionId, TPartitionInfo(
- ctx.Register(CreatePartitionActor(partitionId, Config, true, ctx)),
+ ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)),
GetPartitionKeyRange(partition),
true,
*Counters
@@ -657,12 +687,12 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
TopicName = Config.GetTopicName();
TopicPath = Config.GetTopicPath();
IsLocalDC = Config.GetLocalDC();
- auto& pqConfig = AppData(ctx)->PQConfig;
- TopicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
- pqConfig, "", IsLocalDC
- );
- //ToDo [migration] - check account
- TopicConverter = TopicConverterFactory->MakeTopicConverter(Config);
+
+ CreateTopicConverter(Config,
+ TopicConverterFactory,
+ TopicConverter,
+ ctx);
+
KeySchema.clear();
KeySchema.reserve(Config.PartitionKeySchemaSize());
for (const auto& component : Config.GetPartitionKeySchema()) {
@@ -718,8 +748,11 @@ void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TAct
void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx)
{
- bool ok = (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) && (resp.ReadResultSize() == 2) && (resp.HasSetExecutorFastLogPolicyResult()) &&
- (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
+ bool ok =
+ (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) &&
+ (resp.ReadResultSize() == 3) &&
+ (resp.HasSetExecutorFastLogPolicyResult()) &&
+ (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK);
if (!ok) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() << " Config read error: " << resp.DebugString() << " " << ctx.SelfID);
@@ -727,21 +760,72 @@ void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp,
return;
}
- ReadConfig(resp.GetReadResult(0), ctx);
+ ReadTxInfo(resp.GetReadResult(2), ctx);
+ ReadConfig(resp.GetReadResult(0), resp.GetReadRangeResult(0), ctx);
ReadState(resp.GetReadResult(1), ctx);
}
-void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx)
+void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& read,
+ const TActorContext& ctx)
{
+ Y_VERIFY(read.HasStatus());
if (read.GetStatus() != NKikimrProto::OK && read.GetStatus() != NKikimrProto::NODATA) {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
- "Tablet " << TabletID() << " Config read error " << ctx.SelfID);
+ "Tablet " << TabletID() << " tx info read error " << ctx.SelfID);
ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
return;
}
- Y_VERIFY(!ConfigInited);
+ switch (read.GetStatus()) {
+ case NKikimrProto::OK: {
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " has a tx info");
+
+ NKikimrPQ::TTabletTxInfo info;
+ Y_VERIFY(info.ParseFromString(read.GetValue()));
+
+ LastStep = info.GetLastStep();
+ LastTxId = info.GetLastTxId();
+
+ break;
+ }
+ case NKikimrProto::NODATA: {
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " doesn't have tx info");
+
+ LastStep = 0;
+ LastTxId = 0;
+
+ break;
+ }
+ default:
+ Y_FAIL("Unexpected tx info read status: %d", read.GetStatus());
+ }
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " LastStep " << LastStep << " LastTxId " << LastTxId);
+}
+
+void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read,
+ const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
+ const TActorContext& ctx)
+{
Y_VERIFY(read.HasStatus());
+ if (read.GetStatus() != NKikimrProto::OK && read.GetStatus() != NKikimrProto::NODATA) {
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+ "Tablet " << TabletID() << " Config read error " << ctx.SelfID <<
+ " Error status code " << read.GetStatus());
+ ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ return;
+ }
+
+ Y_VERIFY(readRange.HasStatus());
+ if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) {
+ LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
+ "Tablet " << TabletID() << " Transactions read error " << ctx.SelfID <<
+ " Error status code " << readRange.GetStatus());
+ ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
+ return;
+ }
+
+ Y_VERIFY(!ConfigInited);
if (read.GetStatus() == NKikimrProto::OK) {
bool res = Config.ParseFromString(read.GetValue());
@@ -756,12 +840,12 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
TopicName = Config.GetTopicName();
TopicPath = Config.GetTopicPath();
IsLocalDC = Config.GetLocalDC();
- auto& pqConfig = AppData(ctx)->PQConfig;
- TopicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>(
- pqConfig, "", IsLocalDC
- );
- TopicConverter = TopicConverterFactory->MakeTopicConverter(Config);
- Y_VERIFY(TopicConverter->IsValid(), "%s", TopicConverter->GetReason().c_str());
+
+ CreateTopicConverter(Config,
+ TopicConverterFactory,
+ TopicConverter,
+ ctx);
+
KeySchema.clear();
KeySchema.reserve(Config.PartitionKeySchemaSize());
for (const auto& component : Config.GetPartitionKeySchema()) {
@@ -782,15 +866,19 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
Y_FAIL("Unexpected config read status: %d", read.GetStatus());
}
+ THashMap<ui32, TVector<TTransaction>> partitionTxs;
+ InitTransactions(readRange, partitionTxs);
+
for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config
const auto partitionId = partition.GetPartitionId();
Partitions.emplace(partitionId, TPartitionInfo(
- ctx.Register(CreatePartitionActor(partitionId, Config, false, ctx)),
+ ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)),
GetPartitionKeyRange(partition),
false,
*Counters
));
}
+
ConfigInited = true;
InitializeMeteringSink(ctx);
@@ -812,7 +900,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
HasDataRequests.clear();
if (Partitions.empty()) {
- SignalTabletActive(ctx);
+ OnInitComplete(ctx);
}
}
@@ -1093,7 +1181,6 @@ void TPersQueue::Handle(TEvPQ::TEvTabletCacheCounters::TPtr& ev, const TActorCon
<< "Counters. CacheSize " << CacheCounters.CacheSizeBytes << " CachedBlobs " << CacheCounters.CacheSizeBlobs);
}
-
void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& ctx)
{
auto it = Partitions.find(ev->Get()->Partition);
@@ -1104,7 +1191,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
Y_VERIFY(ConfigInited);//partitions are inited only after config
if (PartitionsInited == Partitions.size()) {
- SignalTabletActive(ctx);
+ OnInitComplete(ctx);
}
if (NewConfigShouldBeApplied && PartitionsInited == Partitions.size()) {
@@ -1189,6 +1276,21 @@ void TPersQueue::TrySendUpdateConfigResponses(const TActorContext& ctx)
ChangeConfigNotification.clear();
}
+
+void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
+ NPersQueue::TConverterFactoryPtr& converterFactory,
+ NPersQueue::TTopicConverterPtr& topicConverter,
+ const TActorContext& ctx)
+{
+ auto& pqConfig = AppData(ctx)->PQConfig;
+ converterFactory =
+ std::make_shared<NPersQueue::TTopicNamesConverterFactory>(pqConfig,
+ "",
+ config.GetLocalDC());
+ topicConverter = converterFactory->MakeTopicConverter(config);
+ Y_VERIFY(topicConverter);
+ Y_VERIFY(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
+}
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
{
@@ -1739,7 +1841,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
uncompressedSize = 0;
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
- "got client PART message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition()
+ "Tablet " << TabletID() <<
+ " got client PART message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition()
<< " SourceId: \'" << EscapeC(msgs.back().SourceId) << "\' SeqNo: "
<< msgs.back().SeqNo << " partNo : " << msgs.back().PartNo
<< " messageNo: " << req.GetMessageNo() << " size: " << data.size()
@@ -1755,7 +1858,9 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline()
});
}
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "got client message topic: " << TopicConverter->GetClientsideName() <<
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Tablet " << TabletID() <<
+ " got client message topic: " << TopicConverter->GetClientsideName() <<
" partition: " << req.GetPartition() <<
" SourceId: \'" << EscapeC(msgs.back().SourceId) <<
"\' SeqNo: " << msgs.back().SeqNo << " partNo : " << msgs.back().PartNo <<
@@ -2040,7 +2145,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
ui32 partition = req.GetPartition();
auto it = Partitions.find(partition);
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "got client message batch for topic " << TopicConverter->GetClientsideName() << " partition " << partition << "\n");
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic " << TopicConverter->GetClientsideName() << " partition " << partition << "\n");
if (it == Partitions.end()) {
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::WRONG_PARTITION_NUMBER,
@@ -2144,25 +2249,37 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo
if (PipeClientCache->OnConnect(ev)) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
- "Connected to tablet " << ev->Get()->TabletId << " from tablet " << TabletID());
- } else {
- if (ev->Get()->Dead) {
- //AckRSToDeletedTablet(ev->Get()->TabletId, ctx);
- } else {
- LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE,
- "Failed to connect to tablet " << ev->Get()->TabletId << " from tablet " << TabletID());
- //RestartPipeRS(ev->Get()->TabletId, ctx);
- }
+ "Tablet " << TabletID() <<
+ " Connected to tablet " << ev->Get()->TabletId);
+ return;
}
+
+ RestartPipe(ev->Get()->TabletId, ctx);
}
void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
{
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
- "Client pipe to tablet " << ev->Get()->TabletId << " from " << TabletID() << " is reset");
+ "Tablet " << TabletID() <<
+ " Client pipe to tablet " << ev->Get()->TabletId << " is reset");
PipeClientCache->OnDisconnect(ev);
- //RestartPipeRS(ev->Get()->TabletId, ctx);
+
+ RestartPipe(ev->Get()->TabletId, ctx);
+}
+
+void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx)
+{
+ for (auto& txId: GetBindedTxs(tabletId)) {
+ auto* tx = GetTransaction(ctx, txId);
+ if (!tx) {
+ continue;
+ }
+
+ for (auto& message : tx->GetBindedMsgs(tabletId)) {
+ PipeClientCache->Send(ctx, tabletId, message.Type, message.Data);
+ }
+ }
}
bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx)
@@ -2173,7 +2290,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
if (ev->Get()->Cgi().Has("kv")) {
return TKeyValueFlat::OnRenderAppHtmlPage(ev, ctx);
}
- LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRemoteHttpInfo: " << ev->Get()->Query);
+ LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvRemoteHttpInfo: " << ev->Get()->Query);
TMap<ui32, TActorId> res;
for (auto& p : Partitions) {
res.insert({p.first, p.second.Actor});
@@ -2244,12 +2361,22 @@ void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorCont
{
Y_VERIFY(ev->Get()->Node);
DCId = ev->Get()->Node->Location.GetDataCenterId();
-
ResourceMetrics = Executor()->GetResourceMetrics();
+
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
request->Record.SetCookie(READ_CONFIG_COOKIE);
+
request->Record.AddCmdRead()->SetKey(KeyConfig());
request->Record.AddCmdRead()->SetKey(KeyState());
+ request->Record.AddCmdRead()->SetKey(KeyTxInfo());
+
+ auto cmd = request->Record.AddCmdReadRange();
+ cmd->MutableRange()->SetFrom(GetTxKey(Min<ui64>()));
+ cmd->MutableRange()->SetIncludeFrom(true);
+ cmd->MutableRange()->SetTo(GetTxKey(Max<ui64>()));
+ cmd->MutableRange()->SetIncludeTo(true);
+ cmd->SetIncludeData(true);
+
request->Record.MutableCmdSetExecutorFastLogPolicy()
->SetIsAllowed(AppData(ctx)->PQConfig.GetTactic() == NKikimrClient::TKeyValueRequest::MIN_LATENCY);
ctx.Send(ctx.SelfID, request.Release());
@@ -2272,7 +2399,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) {
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
{
- LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction");
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvProposeTransaction");
NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
switch (event.GetTxBodyCase()) {
@@ -2283,7 +2410,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
HandleConfigTransaction(ev->Release(), ctx);
break;
case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET:
- SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()),
+ SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()),
event.GetTxId(),
ctx);
break;
@@ -2305,7 +2432,8 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
bool isWriteOperation = !operation.HasBegin();
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
- "tx=" << event.GetTxId() <<
+ "Tablet " << TabletID() <<
+ " tx=" << event.GetTxId() <<
", lock_tx_id=" << txBody.GetLockTxId() <<
", path=" << operation.GetPath() <<
", partition=" << operation.GetPartitionId() <<
@@ -2316,7 +2444,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
}
if (TabletState != NKikimrPQ::ENormal) {
- SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()),
+ SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()),
event.GetTxId(),
ctx);
return;
@@ -2384,6 +2512,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
if (tx->State == NKikimrPQ::TTransaction::WAIT_RS) {
CheckTxState(ctx, *tx);
+
+ TryWriteTxs(ctx);
}
} else if (ack) {
//
@@ -2404,9 +2534,12 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorCo
}
tx->OnReadSetAck(event);
+ tx->UnbindMsgsFromPipe(event.GetTabletConsumer());
if (tx->State == NKikimrPQ::TTransaction::EXECUTED) {
CheckTxState(ctx, *tx);
+
+ TryWriteTxs(ctx);
}
}
@@ -2435,6 +2568,8 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const
return;
}
+ Y_VERIFY(tx->State == NKikimrPQ::TTransaction::CALCULATING);
+
tx->OnProposePartitionConfigResult(event);
CheckTxState(ctx, *tx);
@@ -2451,6 +2586,8 @@ void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& c
return;
}
+ Y_VERIFY(tx->State == NKikimrPQ::TTransaction::EXECUTING);
+
tx->OnTxCommitDone(event);
CheckTxState(ctx, *tx);
@@ -2479,6 +2616,8 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx)
WriteTxsInProgress = true;
ctx.Send(ctx.SelfID, request.Release());
+
+ TryReturnTabletStateAll(ctx);
}
void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp,
@@ -2505,7 +2644,6 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp,
}
SendReplies(ctx);
- TryReturnTabletStateAll(ctx);
CheckChangedTxStates(ctx);
WriteTxsInProgress = false;
@@ -2529,13 +2667,23 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
const NKikimrPQ::TEvProposeTransaction& event = front->Record;
TDistributedTransaction& tx = Txs[event.GetTxId()];
- if (tx.State != NKikimrPQ::TTransaction::UNKNOWN) {
- continue;
+ switch (tx.State) {
+ case NKikimrPQ::TTransaction::UNKNOWN: {
+ tx.OnProposeTransaction(event, GetAllowedStep());
+ CheckTxState(ctx, tx);
+ break;
+ }
+ case NKikimrPQ::TTransaction::PREPARING: {
+ break;
+ }
+ case NKikimrPQ::TTransaction::PREPARED: {
+ ScheduleProposeTransactionResult(tx);
+ break;
+ }
+ default: {
+ Y_FAIL();
+ }
}
-
- tx.OnProposeTransaction(event, GetAllowedStep());
-
- CheckTxState(ctx, tx);
}
}
@@ -2563,11 +2711,11 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
txAcks[ActorIdFromProto(tx.GetAckTo())].push_back(tx.GetTxId());
}
- if (step > LastStep) {
- ui64 lastTxId = 0;
+ if (step >= LastStep) {
+ ui64 lastPlannedTxId = 0;
for (ui64 txId : txIds) {
- Y_VERIFY(lastTxId < txId);
+ Y_VERIFY(lastPlannedTxId < txId);
if (auto p = Txs.find(txId); p != Txs.end()) {
TDistributedTransaction& tx = p->second;
@@ -2581,27 +2729,31 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
TxQueue.emplace(step, txId);
} else {
LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE,
- "Transaction already planned for step " << tx.Step <<
- ". TabletId: " << TabletID() <<
+ "Tablet " << TabletID() <<
+ " Transaction already planned for step " << tx.Step <<
", Step: " << step <<
", TxId: " << txId);
+
+ if (tx.State > NKikimrPQ::TTransaction::PLANNING) {
+ SendEvProposeTransactionResult(ctx, tx);
+ }
}
} else {
LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE,
- "Unknown transaction " << txId <<
- ". TabletId: " << TabletID() <<
+ "Tablet " << TabletID() <<
+ " Unknown transaction " << txId <<
", Step: " << step);
}
- LastTxId = txId;
+ lastPlannedTxId = txId;
}
LastStep = step;
- LastTxId = lastTxId;
+ LastTxId = lastPlannedTxId;
} else {
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
- "Old plan step " << step <<
- ". TabletId: " << TabletID() <<
+ "Tablet " << TabletID() <<
+ " Old plan step " << step <<
", LastStep: " << LastStep);
}
@@ -2686,27 +2838,31 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction&
event->Record.SetMinStep(tx.MinStep);
event->Record.SetMaxStep(tx.MaxStep);
- Replies.emplace_back(tx.Source, std::move(event));
+ if (tx.SourceTablet != InvalidTabletId) {
+ RepliesToPipe.emplace_back(tx.SourceTablet, tx.TxId, std::move(event));
+ } else {
+ RepliesToActor.emplace_back(tx.SourceActor, std::move(event));
+ }
}
void TPersQueue::SchedulePlanStepAck(ui64 step,
const THashMap<TActorId, TVector<ui64>>& txAcks)
{
- for (auto& [target, txIds] : txAcks) {
+ for (auto& [actorId, txIds] : txAcks) {
auto event = std::make_unique<TEvTxProcessing::TEvPlanStepAck>(TabletID(),
step,
txIds.begin(), txIds.end());
- Replies.emplace_back(target, event.release());
+ RepliesToActor.emplace_back(actorId, std::move(event));
}
}
-void TPersQueue::SchedulePlanStepAccepted(const TActorId& target,
+void TPersQueue::SchedulePlanStepAccepted(const TActorId& actorId,
ui64 step)
{
auto event = std::make_unique<TEvTxProcessing::TEvPlanStepAccepted>(TabletID(), step);
- Replies.emplace_back(target, event.release());
+ RepliesToActor.emplace_back(actorId, std::move(event));
}
void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
@@ -2726,7 +2882,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
TabletID(),
body,
0);
- PipeClientCache->Send(ctx, receiverId, event.release());
+ SendToPipe(receiverId, tx, std::move(event), ctx);
}
tx.ReadSetAcks.clear();
@@ -2800,7 +2956,7 @@ void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx,
}
void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
- const TDistributedTransaction& tx)
+ TDistributedTransaction& tx)
{
auto result = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
auto status =
@@ -2811,7 +2967,46 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
result->Record.SetTxId(tx.TxId);
result->Record.SetStep(tx.Step);
- ctx.Send(tx.Source, result.release());
+ if (tx.SourceTablet != InvalidTabletId) {
+ SendToPipe(tx.SourceTablet, tx, std::move(result), ctx);
+ } else {
+ ctx.Send(tx.SourceActor, std::move(result));
+ }
+}
+
+void TPersQueue::SendToPipe(ui64 tabletId,
+ TDistributedTransaction& tx,
+ std::unique_ptr<IEventBase> event,
+ const TActorContext& ctx)
+{
+ Y_VERIFY(event);
+
+ BindTxToPipe(tabletId, tx.TxId);
+ tx.BindMsgToPipe(tabletId, *event);
+ PipeClientCache->Send(ctx, tabletId, event.release());
+}
+
+void TPersQueue::BindTxToPipe(ui64 tabletId, ui64 txId)
+{
+ BindedTxs[tabletId].insert(txId);
+}
+
+void TPersQueue::UnbindTxFromPipe(ui64 tabletId, ui64 txId)
+{
+ if (auto p = BindedTxs.find(tabletId); p != BindedTxs.end()) {
+ p->second.erase(txId);
+ }
+}
+
+const THashSet<ui64>& TPersQueue::GetBindedTxs(ui64 tabletId)
+{
+ if (auto p = BindedTxs.find(tabletId); p != BindedTxs.end()) {
+ return p->second;
+ }
+
+ static THashSet<ui64> empty;
+
+ return empty;
}
TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx,
@@ -2820,8 +3015,8 @@ TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx,
auto p = Txs.find(txId);
if (p == Txs.end()) {
LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE,
- "Unknown transaction " << txId <<
- ". TabletId: " << TabletID());
+ "Tablet " << TabletID() <<
+ " Unknown transaction " << txId);
return nullptr;
}
return &p->second;
@@ -2882,10 +3077,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
case NKikimrPQ::TTransaction::KIND_DATA:
SendEvTxCalcPredicateToPartitions(ctx, tx);
break;
- case NKikimrPQ::TTransaction::KIND_CONFIG:
- CreateNewPartitions(tx.TabletConfig, ctx);
+ case NKikimrPQ::TTransaction::KIND_CONFIG: {
+ NPersQueue::TConverterFactoryPtr converterFactory;
+ CreateTopicConverter(tx.TabletConfig,
+ converterFactory,
+ tx.TopicConverter,
+ ctx);
+ CreateNewPartitions(tx.TabletConfig,
+ tx.TopicConverter,
+ ctx);
SendEvProposePartitionConfig(ctx, tx);
break;
+ }
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
Y_VERIFY(false);
}
@@ -2908,7 +3111,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
tx.State = NKikimrPQ::TTransaction::CALCULATED;
break;
case NKikimrPQ::TTransaction::KIND_CONFIG:
+ SendEvReadSetToReceivers(ctx, tx);
+
tx.State = NKikimrPQ::TTransaction::WAIT_RS;
+
CheckTxState(ctx, tx);
break;
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
@@ -2928,7 +3134,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
[[fallthrough]];
case NKikimrPQ::TTransaction::WAIT_RS:
- Y_VERIFY(tx.ReadSetAcks.size() <= tx.Senders.size());
+ Y_VERIFY(tx.ReadSetAcks.size() <= tx.Receivers.size());
if (tx.HaveParticipantsDecision()) {
SendEvProposeTransactionResult(ctx, tx);
@@ -2969,12 +3175,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
tx.State = NKikimrPQ::TTransaction::EXECUTED;
TxQueue.pop();
- if (!TxQueue.empty()) {
- auto next = GetTransaction(ctx, TxQueue.front().second);
- Y_VERIFY(next);
-
- CheckTxState(ctx, *next);
- }
+ TryStartTransaction(ctx);
} else {
break;
}
@@ -3006,10 +3207,26 @@ void TPersQueue::DeleteTx(TDistributedTransaction& tx)
void TPersQueue::SendReplies(const TActorContext& ctx)
{
- for (auto& [actorId, event] : Replies) {
+ SendRepliesToActors(ctx);
+ SendRepliesToPipes(ctx);
+}
+
+void TPersQueue::SendRepliesToActors(const TActorContext& ctx)
+{
+ for (auto& [actorId, event] : RepliesToActor) {
ctx.Send(actorId, event.release());
}
- Replies.clear();
+ RepliesToActor.clear();
+}
+
+void TPersQueue::SendRepliesToPipes(const TActorContext& ctx)
+{
+ for (auto& [tabletId, txId, event] : RepliesToPipe) {
+ auto* tx = GetTransaction(ctx, txId);
+ Y_VERIFY(tx);
+ SendToPipe(tabletId, *tx, std::move(event), ctx);
+ }
+ RepliesToPipe.clear();
}
void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
@@ -3047,7 +3264,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
for (auto& [_, partition] : Partitions) {
auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId);
- event->TopicConverter = TopicConverter;
+ event->TopicConverter = tx.TopicConverter;
event->Config = tx.TabletConfig;
ctx.Send(partition.Actor, std::move(event));
@@ -3058,6 +3275,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
}
TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
+ const NPersQueue::TTopicConverterPtr topicConverter,
const NKikimrPQ::TPQTabletConfig& config,
bool newPartition,
const TActorContext& ctx)
@@ -3066,7 +3284,7 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
partitionId,
ctx.SelfID,
CacheActor,
- TopicConverter,
+ topicConverter,
IsLocalDC,
DCId,
IsServerless,
@@ -3077,6 +3295,7 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId,
}
void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
+ NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx)
{
EnsurePartitionsAreNotDeleted(config);
@@ -3095,7 +3314,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
continue;
}
- TActorId actorId = ctx.Register(CreatePartitionActor(partitionId, config, true, ctx));
+ TActorId actorId = ctx.Register(CreatePartitionActor(partitionId, topicConverter, config, true, ctx));
Partitions.emplace(std::piecewise_construct,
std::forward_as_tuple(partitionId),
@@ -3126,6 +3345,55 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig&
Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId());
}
}
+
+void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
+ THashMap<ui32, TVector<TTransaction>>& partitionTxs)
+{
+ Txs.clear();
+ TxQueue.clear();
+
+ std::deque<std::pair<ui64, ui64>> plannedTxs;
+
+ for (size_t i = 0; i < readRange.PairSize(); ++i) {
+ auto& pair = readRange.GetPair(i);
+
+ NKikimrPQ::TTransaction tx;
+ Y_VERIFY(tx.ParseFromString(pair.GetValue()));
+
+ Txs.emplace(tx.GetTxId(), tx);
+
+ if (tx.HasStep()) {
+ if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(LastStep, LastTxId)) {
+ plannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
+ }
+ }
+ }
+
+ std::sort(plannedTxs.begin(), plannedTxs.end());
+ for (auto& item : plannedTxs) {
+ TxQueue.push(item);
+ }
+
+ Y_UNUSED(partitionTxs);
+}
+
+void TPersQueue::TryStartTransaction(const TActorContext& ctx)
+{
+ if (TxQueue.empty()) {
+ return;
+ }
+
+ auto next = GetTransaction(ctx, TxQueue.front().second);
+ Y_VERIFY(next);
+
+ CheckTxState(ctx, *next);
+}
+
+void TPersQueue::OnInitComplete(const TActorContext& ctx)
+{
+ SignalTabletActive(ctx);
+ TryStartTransaction(ctx);
+}
ui64 TPersQueue::GetAllowedStep() const
{
diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h
index 569121c7f90..ac21b3834e9 100644
--- a/ydb/core/persqueue/pq_impl.h
+++ b/ydb/core/persqueue/pq_impl.h
@@ -22,6 +22,8 @@ struct TChangeNotification;
class TResponseBuilder;
class TPartition;
+struct TTransaction;
+
//USES MAIN chanel for big blobs, INLINE or EXTRA for ZK-like load, EXTRA2 for small blob for logging (VDISK of type LOG is ok with EXTRA2)
class TPersQueue : public NKeyValue::TKeyValueFlat {
@@ -99,7 +101,11 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
const TActorContext& ctx);
void HandleStateWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx);
- void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx);
+ void ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& read,
+ const TActorContext& ctx);
+ void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read,
+ const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
+ const TActorContext& ctx);
void ReadState(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx);
void InitializeMeteringSink(const TActorContext& ctx);
@@ -113,6 +119,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
NPersQueue::NErrorCode::EErrorCode& code, TString& error) const;
void TrySendUpdateConfigResponses(const TActorContext& ctx);
+ static void CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
+ NPersQueue::TConverterFactoryPtr& converterFactory,
+ NPersQueue::TTopicConverterPtr& topicConverter,
+ const TActorContext& ctx);
//client request
void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx);
@@ -227,7 +237,12 @@ private:
TMaybe<NKikimrPQ::TPQTabletConfig> TabletConfigTx;
TMaybe<NKikimrPQ::TBootstrapConfig> BootstrapConfigTx;
bool WriteTxsInProgress = false;
- TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
+
+ struct TReplyToActor;
+ struct TReplyToPipe;
+
+ TVector<TReplyToActor> RepliesToActor;
+ TVector<TReplyToPipe> RepliesToPipe;
TIntrusivePtr<NTabletPipe::TBoundedClientCacheConfig> PipeClientCacheConfig;
THolder<NTabletPipe::IClientCache> PipeClientCache;
@@ -262,7 +277,7 @@ private:
void SendEvTxRollbackToPartitions(const TActorContext& ctx,
TDistributedTransaction& tx);
void SendEvProposeTransactionResult(const TActorContext& ctx,
- const TDistributedTransaction& tx);
+ TDistributedTransaction& tx);
TDistributedTransaction* GetTransaction(const TActorContext& ctx,
ui64 txId);
@@ -274,6 +289,8 @@ private:
void DeleteTx(TDistributedTransaction& tx);
void SendReplies(const TActorContext& ctx);
+ void SendRepliesToActors(const TActorContext& ctx);
+ void SendRepliesToPipes(const TActorContext& ctx);
void CheckChangedTxStates(const TActorContext& ctx);
bool AllTransactionsHaveBeenProcessed() const;
@@ -296,10 +313,12 @@ private:
TDistributedTransaction& tx);
TPartition* CreatePartitionActor(ui32 partitionId,
+ const NPersQueue::TTopicConverterPtr topicConverter,
const NKikimrPQ::TPQTabletConfig& config,
bool newPartition,
const TActorContext& ctx);
void CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
+ NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx);
void EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const;
@@ -314,6 +333,24 @@ private:
const TActorContext& ctx);
void ClearNewConfig();
+
+ void SendToPipe(ui64 tabletId,
+ TDistributedTransaction& tx,
+ std::unique_ptr<IEventBase> event,
+ const TActorContext& ctx);
+
+ void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
+ THashMap<ui32, TVector<TTransaction>>& partitionTxs);
+ void TryStartTransaction(const TActorContext& ctx);
+ void OnInitComplete(const TActorContext& ctx);
+
+ void RestartPipe(ui64 tabletId, const TActorContext& ctx);
+
+ void BindTxToPipe(ui64 tabletId, ui64 txId);
+ void UnbindTxFromPipe(ui64 tabletId, ui64 txId);
+ const THashSet<ui64>& GetBindedTxs(ui64 tabletId);
+
+ THashMap<ui64, THashSet<ui64>> BindedTxs;
};
diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp
index 1e7bb541152..80a428f650a 100644
--- a/ydb/core/persqueue/transaction.cpp
+++ b/ydb/core/persqueue/transaction.cpp
@@ -1,32 +1,134 @@
#include "transaction.h"
namespace NKikimr::NPQ {
+
+TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& tx) :
+ TDistributedTransaction()
+{
+ Kind = tx.GetKind();
+ if (tx.HasStep()) {
+ Step = tx.GetStep();
+ }
+ TxId = tx.GetTxId();
+ State = tx.GetState();
+ MinStep = tx.GetMinStep();
+ MaxStep = tx.GetMaxStep();
+
+ switch (Kind) {
+ case NKikimrPQ::TTransaction::KIND_DATA:
+ InitDataTransaction(tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_CONFIG:
+ InitConfigTransaction(tx);
+ break;
+ case NKikimrPQ::TTransaction::KIND_UNKNOWN:
+ Y_FAIL_S("unknown transaction type");
+ }
+
+ if (tx.HasSelfPredicate()) {
+ SelfDecision =
+ tx.GetSelfPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
+ }
+ if (tx.HasAggrPredicate()) {
+ ParticipantsDecision =
+ tx.GetAggrPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
+ }
+
+ if (tx.HasTablet()) {
+ SourceTablet = tx.GetTablet();
+ } else {
+ Y_VERIFY(tx.HasActor());
+ SourceActor = ActorIdFromProto(tx.GetActor());
+ }
+}
+
+void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx)
+{
+ for (ui64 tabletId : tx.GetSenders()) {
+ Senders.insert(tabletId);
+ }
+ for (ui64 tabletId : tx.GetReceivers()) {
+ Receivers.insert(tabletId);
+ }
+
+ InitPartitions(tx.GetOperations());
+}
+
+void TDistributedTransaction::InitPartitions(const google::protobuf::RepeatedPtrField<NKikimrPQ::TPartitionOperation>& operations)
+{
+ Partitions.clear();
+
+ for (auto& o : operations) {
+ Operations.push_back(o);
+ Partitions.insert(o.GetPartitionId());
+ }
+}
+
+void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransaction& tx)
+{
+ Y_VERIFY(tx.HasSchemeShardId());
+
+ Receivers.insert(tx.GetSchemeShardId());
+
+ TabletConfig = tx.GetTabletConfig();
+ BootstrapConfig = tx.GetBootstrapConfig();
+
+ InitPartitions(TabletConfig);
+}
+
+void TDistributedTransaction::InitPartitions(const NKikimrPQ::TPQTabletConfig& config)
+{
+ Partitions.clear();
+
+ if (config.PartitionsSize()) {
+ for (const auto& partition : config.GetPartitions()) {
+ Partitions.insert(partition.GetPartitionId());
+ }
+ } else {
+ for (auto partitionId : config.GetPartitionIds()) {
+ Partitions.insert(partitionId);
+ }
+ }
+}
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event,
ui64 minStep)
{
Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET);
+ Y_VERIFY(event.GetSourceCase() != NKikimrPQ::TEvProposeTransaction::SOURCE_NOT_SET);
Y_VERIFY(TxId == Max<ui64>());
TxId = event.GetTxId();
MinStep = minStep;
- MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds();
switch (event.GetTxBodyCase()) {
case NKikimrPQ::TEvProposeTransaction::kData:
Y_VERIFY(event.HasData());
+ MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds();
OnProposeTransaction(event.GetData());
break;
case NKikimrPQ::TEvProposeTransaction::kConfig:
Y_VERIFY(event.HasConfig());
+ MaxStep = Max<ui64>();
OnProposeTransaction(event.GetConfig());
break;
- case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET:
- break;
+ default:
+ Y_FAIL_S("unknown TxBody case");
}
- Source = ActorIdFromProto(event.GetSource());
+ switch (event.GetSourceCase()) {
+ case NKikimrPQ::TEvProposeTransaction::kActor:
+ Y_VERIFY(event.HasActor());
+ SourceActor = ActorIdFromProto(event.GetActor());
+ break;
+ case NKikimrPQ::TEvProposeTransaction::kTablet:
+ Y_VERIFY(event.HasTablet());
+ SourceTablet = event.GetTablet();
+ break;
+ default:
+ Y_FAIL_S("unknown Source case");
+ }
}
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody)
@@ -41,10 +143,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
Receivers.insert(tablet);
}
- for (auto& operation : txBody.GetOperations()) {
- Operations.push_back(operation);
- Partitions.insert(operation.GetPartitionId());
- }
+ InitPartitions(txBody.GetOperations());
PartitionRepliesCount = 0;
PartitionRepliesExpected = 0;
@@ -54,20 +153,16 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody)
{
+ Y_VERIFY(txBody.HasSchemeShardId());
+
Kind = NKikimrPQ::TTransaction::KIND_CONFIG;
+ Receivers.insert(txBody.GetSchemeShardId());
+
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();
- if (TabletConfig.PartitionsSize()) {
- for (const auto& partition : TabletConfig.GetPartitions()) {
- Partitions.insert(partition.GetPartitionId());
- }
- } else {
- for (auto partitionId : TabletConfig.GetPartitionIds()) {
- Partitions.insert(partitionId);
- }
- }
+ InitPartitions(TabletConfig);
PartitionRepliesCount = 0;
PartitionRepliesExpected = 0;
@@ -85,22 +180,25 @@ void TDistributedTransaction::OnPlanStep(ui64 step)
void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event)
{
- Y_VERIFY(Step == event.Step);
- Y_VERIFY(TxId == event.TxId);
-
- Y_VERIFY(Partitions.contains(event.Partition));
-
- SetDecision(event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
-
- ++PartitionRepliesCount;
+ OnPartitionResult(event,
+ event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
}
void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
{
+ OnPartitionResult(event,
+ NKikimrTx::TReadSetData::DECISION_COMMIT);
+}
+
+template<class E>
+void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision)
+{
Y_VERIFY(Step == event.Step);
Y_VERIFY(TxId == event.TxId);
- SetDecision(NKikimrTx::TReadSetData::DECISION_COMMIT);
+ Y_VERIFY(Partitions.contains(event.Partition));
+
+ SetDecision(decision);
++PartitionRepliesCount;
}
@@ -206,7 +304,14 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque
AddCmdWriteConfigTx(tx);
break;
case NKikimrPQ::TTransaction::KIND_UNKNOWN:
- break;
+ Y_FAIL_S("unknown transaction type");
+ }
+
+ if (SourceTablet != Max<ui64>()) {
+ tx.SetTablet(SourceTablet);
+ } else {
+ Y_VERIFY(SourceActor != TActorId());
+ ActorIdToProto(SourceActor, tx.MutableActor());
}
TString value;
@@ -236,6 +341,10 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx)
void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx)
{
+ Y_VERIFY(Receivers.size() == 1);
+
+ tx.SetSchemeShardId(*Receivers.begin());
+
*tx.MutableTabletConfig() = TabletConfig;
*tx.MutableBootstrapConfig() = BootstrapConfig;
}
@@ -259,7 +368,33 @@ void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision& va
TString TDistributedTransaction::GetKey() const
{
- return Sprintf("tx_%lu", TxId);
+ return GetTxKey(TxId);
+}
+
+void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event)
+{
+ Y_VERIFY(event.IsSerializable());
+
+ TAllocChunkSerializer serializer;
+ Y_VERIFY(event.SerializeToArcadiaStream(&serializer));
+ auto data = serializer.Release(event.CreateSerializationInfo());
+ OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data));
+}
+
+void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId)
+{
+ OutputMsgs.erase(tabletId);
+}
+
+auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>&
+{
+ if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) {
+ return p->second;
+ }
+
+ static TVector<TSerializedMessage> empty;
+
+ return empty;
}
}
diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h
index cbe3d787aaa..ab58a4b7a5f 100644
--- a/ydb/core/persqueue/transaction.h
+++ b/ydb/core/persqueue/transaction.h
@@ -17,6 +17,7 @@ namespace NKikimr::NPQ {
struct TDistributedTransaction {
TDistributedTransaction() = default;
+ explicit TDistributedTransaction(const NKikimrPQ::TTransaction& tx);
void OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event,
ui64 minStep);
@@ -47,7 +48,8 @@ struct TDistributedTransaction {
EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
- NActors::TActorId Source; // отправитель TEvProposeTransaction
+ NActors::TActorId SourceActor; // отправитель TEvProposeTransaction
+ ui64 SourceTablet = Max<ui64>();
THashSet<ui32> Partitions; // список участвующих партиций
size_t PartitionRepliesCount = 0;
@@ -59,6 +61,7 @@ struct TDistributedTransaction {
NKikimrPQ::TPQTabletConfig TabletConfig;
NKikimrPQ::TBootstrapConfig BootstrapConfig;
+ NPersQueue::TTopicConverterPtr TopicConverter;
bool WriteInProgress = false;
@@ -79,6 +82,32 @@ struct TDistributedTransaction {
void AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx);
void AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx);
+
+ void InitDataTransaction(const NKikimrPQ::TTransaction& tx);
+ void InitConfigTransaction(const NKikimrPQ::TTransaction& tx);
+
+ void InitPartitions(const google::protobuf::RepeatedPtrField<NKikimrPQ::TPartitionOperation>& tx);
+ void InitPartitions(const NKikimrPQ::TPQTabletConfig& config);
+
+ template<class E>
+ void OnPartitionResult(const E& event, EDecision decision);
+
+ struct TSerializedMessage {
+ ui32 Type;
+ TIntrusivePtr<TEventSerializedData> Data;
+
+ TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) :
+ Type(type),
+ Data(data)
+ {
+ }
+ };
+
+ THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs;
+
+ void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
+ void UnbindMsgsFromPipe(ui64 tabletId);
+ const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
};
}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 62b89f4d0b7..16df7691622 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -731,7 +731,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition,
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
- ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
+ ActorIdToProto(Ctx->Edge, event->Record.MutableActor());
auto* body = event->Record.MutableData();
auto* operation = body->MutableOperations()->Add();
operation->SetPartitionId(partition);
diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp
index bcfa18cf72a..1013ccfd26f 100644
--- a/ydb/core/persqueue/ut/pqtablet_ut.cpp
+++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp
@@ -38,6 +38,7 @@ struct TTxOperation {
struct TConfigParams {
TMaybe<NKikimrPQ::TPQTabletConfig> Tablet;
TMaybe<NKikimrPQ::TBootstrapConfig> Bootstrap;
+ ui64 SchemeShardId = 0;
};
struct TProposeTransactionParams {
@@ -215,7 +216,7 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
THashSet<ui32> partitions;
- ActorIdToProto(Ctx->Edge, event->Record.MutableSource());
+ ActorIdToProto(Ctx->Edge, event->Record.MutableActor());
event->Record.SetTxId(params.TxId);
if (params.Configs) {
@@ -223,6 +224,9 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
// TxBody.Config
//
auto* body = event->Record.MutableConfig();
+ if (params.Configs->SchemeShardId) {
+ body->SetSchemeShardId(params.Configs->SchemeShardId);
+ }
if (params.Configs->Tablet.Defined()) {
*body->MutableTabletConfig() = *params.Configs->Tablet;
}
@@ -746,6 +750,7 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture)
Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
{
+ NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222);
PQTabletPrepare({.partitions=2}, {}, *Ctx);
const ui64 txId = 67890;
@@ -759,7 +764,8 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
SendProposeTransactionRequest({.TxId=txId,
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
- .Bootstrap=NHelpers::MakeBootstrapConfig()
+ .Bootstrap=NHelpers::MakeBootstrapConfig(),
+ .SchemeShardId = 22222
}});
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
@@ -769,12 +775,16 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture)
WaitPlanStepAck({.Step=100, .TxIds={txId}});
WaitPlanStepAccepted({.Step=100});
+ WaitReadSet(*schemeshard, {.Step=100, .TxId=txId, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId});
+ schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId});
+
WaitProposeTransactionResponse({.TxId=txId,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
}
Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
{
+ NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222);
PQTabletPrepare({.partitions=2}, {}, *Ctx);
const ui64 txId_2 = 67891;
@@ -790,7 +800,8 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
SendProposeTransactionRequest({.TxId=txId_2,
.Configs=NHelpers::TConfigParams{
.Tablet=tabletConfig,
- .Bootstrap=NHelpers::MakeBootstrapConfig()
+ .Bootstrap=NHelpers::MakeBootstrapConfig(),
+ .SchemeShardId = 22222
}});
SendProposeTransactionRequest({.TxId=txId_3,
.TxOps={
@@ -808,6 +819,9 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture)
WaitPlanStepAck({.Step=100, .TxIds={txId_2, txId_3}});
WaitPlanStepAccepted({.Step=100});
+ WaitReadSet(*schemeshard, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId});
+ schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId});
+
WaitProposeTransactionResponse({.TxId=txId_2,
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
WaitProposeTransactionResponse({.TxId=txId_3,
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 9caedbecdca..3dc3dc5bc27 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -826,7 +826,10 @@ message TConfigTransaction {
}
message TEvProposeTransaction {
- optional NActorsProto.TActorId Source = 1;
+ oneof Source {
+ NActorsProto.TActorId Actor = 1;
+ uint64 Tablet = 5;
+ }
optional uint64 TxId = 2;
oneof TxBody {
TDataTransaction Data = 3;
@@ -909,7 +912,7 @@ message TTransaction {
repeated uint64 Receivers = 6;
repeated TPartitionOperation Operations = 7;
optional bool SelfPredicate = 9; // только предикаты партиций. предикаты коллег отдельно
- optional bool AggrPredicate = 10; // заполненено одно из полей Senders or AggrPredicate
+ optional bool AggrPredicate = 10;
//
// TConfigTransaction
@@ -917,6 +920,14 @@ message TTransaction {
optional TPQTabletConfig TabletConfig = 12;
optional TBootstrapConfig BootstrapConfig = 13;
optional uint64 SchemeShardId = 16;
+
+ //
+ // получатель результата
+ //
+ oneof Source {
+ NActorsProto.TActorId Actor = 14;
+ uint64 Tablet = 15;
+ }
};
message TTabletTxInfo {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 3da9bafe66f..9b266eb70f0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -605,7 +605,7 @@ bool CollectSchemaChanged(const TOperationId& operationId,
const auto& evRecord = ev->Get()->Record;
if (evRecord.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::COMPLETE) {
const auto ssId = context.SS->SelfTabletId();
- const auto shardId = TTabletId(evRecord.GetOrigin());
+ const TTabletId shardId(evRecord.GetOrigin());
const auto shardIdx = context.SS->MustGetShardIdx(shardId);
Y_VERIFY(context.SS->ShardInfos.contains(shardIdx));
@@ -649,10 +649,12 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
- TTabletId ssId)
+ TTabletId ssId,
+ const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>();
event->Record.SetTxId(ui64(txId));
+ event->Record.SetTablet(ui64(ssId));
event->Record.MutableConfig()->SetSchemeShardId(ui64(ssId));
MakePQTabletConfig(*event->Record.MutableConfig()->MutableTabletConfig(),
@@ -668,6 +670,10 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans
pqGroup,
txType);
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Propose configure PersQueue" <<
+ ", message: " << event->Record.ShortUtf8DebugString());
+
return event;
}
@@ -680,7 +686,8 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
- TTxState::ETxType txType)
+ TTxState::ETxType txType,
+ const TOperationContext& context)
{
auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>();
event->Record.SetTxId(ui64(txId));
@@ -698,6 +705,10 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId
pqGroup,
txType);
+ LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Propose configure PersQueue" <<
+ ", message: " << event->Record.ShortUtf8DebugString());
+
return event;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index a0eae5139e6..b9d2b83e5a3 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -686,7 +686,8 @@ public:
databaseId,
databasePath,
txState->TxType,
- ssId);
+ ssId,
+ context);
} else {
event = MakeEvUpdateConfig(OperationId.GetTxId(),
*pqGroup,
@@ -697,7 +698,8 @@ public:
folderId,
databaseId,
databasePath,
- txState->TxType);
+ txState->TxType,
+ context);
}
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -849,7 +851,8 @@ private:
const TString& databaseId,
const TString& databasePath,
TTxState::ETxType txType,
- TTabletId ssId);
+ TTabletId ssId,
+ const TOperationContext& context);
static THolder<TEvPersQueue::TEvUpdateConfig>
MakeEvUpdateConfig(TTxId txId,
const TTopicInfo& pqGroup,
@@ -860,7 +863,8 @@ private:
const TString& folderId,
const TString& databaseId,
const TString& databasePath,
- TTxState::ETxType txType);
+ TTxState::ETxType txType,
+ const TOperationContext& context);
};
class TPropose: public TSubOperationState {
diff --git a/ydb/core/tx/schemeshard/ut_pq_reboots.cpp b/ydb/core/tx/schemeshard/ut_pq_reboots.cpp
index 86278552974..d980a16280b 100644
--- a/ydb/core/tx/schemeshard/ut_pq_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_pq_reboots.cpp
@@ -20,7 +20,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(Create) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ t.Runtime->SetScheduledLimit(400);
+
TestCreatePQGroup(runtime, ++t.TxId, "/MyRoot/DirA",
"Name: \"PQGroup_2\""
"TotalGroupCount: 10 "
@@ -51,7 +55,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(CreateMultiplePqTablets) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ t.Runtime->SetScheduledLimit(400);
+
TestCreatePQGroup(runtime, ++t.TxId, "/MyRoot/DirA",
"Name: \"PQGroup_2\""
"TotalGroupCount: 2 "
@@ -140,7 +148,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(AlterWithReboots) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ t.Runtime->SetScheduledLimit(400);
+
TPathVersion pqVer;
{
TInactiveZone inactive(activeZone);
@@ -201,7 +213,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(CreateAlter) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ t.Runtime->SetScheduledLimit(400);
+
t.RestoreLogging();
AsyncCreatePQGroup(runtime, t.TxId++, "/MyRoot/DirA",
@@ -250,7 +266,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(CreateDrop) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& /*activeZone*/) {
+ t.Runtime->SetScheduledLimit(400);
+
t.RestoreLogging();
TestCreatePQGroup(runtime, t.TxId++, "/MyRoot/DirA", GroupConfig);
@@ -284,7 +304,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(CreateDropAbort) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& /*activeZone*/) {
+ t.Runtime->SetScheduledLimit(400);
+
t.RestoreLogging();
ui64& txId = t.TxId;
@@ -310,7 +334,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
//Handle(): requirement std::make_pair(msg->CollectGeneration, msg->CollectStep) >= barrier.MakeCollectPair() failed
/*Y_UNIT_TEST(CreateAlterAlterDrop) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ t.Runtime->SetScheduledLimit(400);
+
t.RestoreLogging();
ui64& txId = t.TxId;
@@ -337,7 +365,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) {
Y_UNIT_TEST(CreateAlterDropPqGroupWithReboots) {
TTestWithReboots t;
+ t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true);
+
t.Run([&](TTestActorRuntime& runtime, bool& /*activeZone*/) {
+ t.Runtime->SetScheduledLimit(400);
+
using ESts = NKikimrScheme::EStatus;
t.RestoreLogging();