diff options
author | abcdef <akotov@ydb.tech> | 2023-04-18 15:11:22 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-04-18 15:11:22 +0300 |
commit | 000d5a64f4c4012c2bb357d59647c681a80f6003 (patch) | |
tree | 02dff7b3a502fdfc209e4c90a4ba837697290e10 | |
parent | c30a4071eb92a8dfc20a8da12b4f0652b284a926 (diff) | |
download | ydb-000d5a64f4c4012c2bb357d59647c681a80f6003.tar.gz |
distributed transactions for updating the PQ config
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/key.h | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 6 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 438 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 43 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 191 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 31 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 20 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 15 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_common.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_common.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_pq_reboots.cpp | 32 |
14 files changed, 680 insertions, 136 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index e3b1f758207..658142e9f90 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2246,7 +2246,7 @@ private: } transaction.SetImmediate(ImmediateTx); - ActorIdToProto(SelfId(), ev->Record.MutableSource()); + ActorIdToProto(SelfId(), ev->Record.MutableActor()); ev->Record.MutableData()->Swap(&transaction); ev->Record.SetTxId(TxId); diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h index 760a5ccbcec..8d932b82a0a 100644 --- a/ydb/core/persqueue/key.h +++ b/ydb/core/persqueue/key.h @@ -246,6 +246,11 @@ private: ui16 InternalPartsCount; }; +inline +TString GetTxKey(ui64 txId) +{ + return Sprintf("tx_%" PRIu64, txId); +} }// NPQ }// NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 7c894068c13..a16d9eb6152 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -184,7 +184,7 @@ void TPartition::ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode) { - ctx.Send(ActorIdFromProto(event.GetSource()), + ctx.Send(ActorIdFromProto(event.GetActor()), MakeReplyPropose(event, statusCode).Release()); } @@ -2722,7 +2722,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event, const TActorContext& ctx) { if (PlanStep.Defined() && TxId.Defined()) { - if (GetStepAndTxId(event) <= GetStepAndTxId(*PlanStep, *TxId)) { + if (GetStepAndTxId(event) < GetStepAndTxId(*PlanStep, *TxId)) { ctx.Send(Tablet, MakeCommitDone(event.Step, event.TxId).Release()); return; } @@ -3246,7 +3246,7 @@ void TPartition::ScheduleReplyError(const ui64 dst, void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode) { - Replies.emplace_back(ActorIdFromProto(event.GetSource()), + Replies.emplace_back(ActorIdFromProto(event.GetActor()), MakeReplyPropose(event, statusCode).Release()); } diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 185f9f18134..1b60d97841f 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -161,8 +161,6 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon switch (response.GetStatus()) { case NKikimrProto::OK: Y_VERIFY(Partition()->Config.ParseFromString(response.GetValue())); - Y_VERIFY(Partition()->Config.GetVersion() <= Partition()->TabletConfig.GetVersion()); - if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) { auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter, Partition()->TabletConfig); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index f8f8559d99e..228d5077d9c 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -29,6 +29,8 @@ static constexpr ui32 CACHE_SIZE = 100_MB; static constexpr ui32 MAX_BYTES = 25_MB; static constexpr ui32 MAX_SOURCE_ID_LENGTH = 10_KB; +constexpr const auto InvalidTabletId = Max<ui64>(); + struct TPartitionInfo { TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange, const bool initDone, const TTabletCountersBase& baseline) @@ -584,6 +586,34 @@ private: /******************************************************* TPersQueue *********************************************************/ +struct TPersQueue::TReplyToActor { + using TEventBasePtr = std::unique_ptr<IEventBase>; + + TReplyToActor(const TActorId& actorId, TEventBasePtr event) : + ActorId(actorId), + Event(std::move(event)) + { + } + + TActorId ActorId; + TEventBasePtr Event; +}; + +struct TPersQueue::TReplyToPipe { + using TEventBasePtr = std::unique_ptr<IEventBase>; + + TReplyToPipe(ui64 tabletId, ui64 txId, TEventBasePtr event) : + TabletId(tabletId), + TxId(txId), + Event(std::move(event)) + { + } + + ui64 TabletId; + ui64 TxId; + TEventBasePtr Event; +}; + void TPersQueue::ReplyError(const TActorContext& ctx, const ui64 responseCookie, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) { ReplyPersQueueError( @@ -623,7 +653,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) const auto partitionId = partition.GetPartitionId(); if (Partitions.find(partitionId) == Partitions.end()) { Partitions.emplace(partitionId, TPartitionInfo( - ctx.Register(CreatePartitionActor(partitionId, Config, true, ctx)), + ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)), GetPartitionKeyRange(partition), true, *Counters @@ -657,12 +687,12 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, TopicName = Config.GetTopicName(); TopicPath = Config.GetTopicPath(); IsLocalDC = Config.GetLocalDC(); - auto& pqConfig = AppData(ctx)->PQConfig; - TopicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>( - pqConfig, "", IsLocalDC - ); - //ToDo [migration] - check account - TopicConverter = TopicConverterFactory->MakeTopicConverter(Config); + + CreateTopicConverter(Config, + TopicConverterFactory, + TopicConverter, + ctx); + KeySchema.clear(); KeySchema.reserve(Config.PartitionKeySchemaSize()); for (const auto& component : Config.GetPartitionKeySchema()) { @@ -718,8 +748,11 @@ void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TAct void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx) { - bool ok = (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) && (resp.ReadResultSize() == 2) && (resp.HasSetExecutorFastLogPolicyResult()) && - (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK); + bool ok = + (resp.GetStatus() == NMsgBusProxy::MSTATUS_OK) && + (resp.ReadResultSize() == 3) && + (resp.HasSetExecutorFastLogPolicyResult()) && + (resp.GetSetExecutorFastLogPolicyResult().GetStatus() == NKikimrProto::OK); if (!ok) { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Config read error: " << resp.DebugString() << " " << ctx.SelfID); @@ -727,21 +760,72 @@ void TPersQueue::HandleConfigReadResponse(const NKikimrClient::TResponse& resp, return; } - ReadConfig(resp.GetReadResult(0), ctx); + ReadTxInfo(resp.GetReadResult(2), ctx); + ReadConfig(resp.GetReadResult(0), resp.GetReadRangeResult(0), ctx); ReadState(resp.GetReadResult(1), ctx); } -void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx) +void TPersQueue::ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& read, + const TActorContext& ctx) { + Y_VERIFY(read.HasStatus()); if (read.GetStatus() != NKikimrProto::OK && read.GetStatus() != NKikimrProto::NODATA) { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, - "Tablet " << TabletID() << " Config read error " << ctx.SelfID); + "Tablet " << TabletID() << " tx info read error " << ctx.SelfID); ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); return; } - Y_VERIFY(!ConfigInited); + switch (read.GetStatus()) { + case NKikimrProto::OK: { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " has a tx info"); + + NKikimrPQ::TTabletTxInfo info; + Y_VERIFY(info.ParseFromString(read.GetValue())); + + LastStep = info.GetLastStep(); + LastTxId = info.GetLastTxId(); + + break; + } + case NKikimrProto::NODATA: { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " doesn't have tx info"); + + LastStep = 0; + LastTxId = 0; + + break; + } + default: + Y_FAIL("Unexpected tx info read status: %d", read.GetStatus()); + } + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " LastStep " << LastStep << " LastTxId " << LastTxId); +} + +void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, + const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, + const TActorContext& ctx) +{ Y_VERIFY(read.HasStatus()); + if (read.GetStatus() != NKikimrProto::OK && read.GetStatus() != NKikimrProto::NODATA) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, + "Tablet " << TabletID() << " Config read error " << ctx.SelfID << + " Error status code " << read.GetStatus()); + ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + return; + } + + Y_VERIFY(readRange.HasStatus()); + if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) { + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, + "Tablet " << TabletID() << " Transactions read error " << ctx.SelfID << + " Error status code " << readRange.GetStatus()); + ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill()); + return; + } + + Y_VERIFY(!ConfigInited); if (read.GetStatus() == NKikimrProto::OK) { bool res = Config.ParseFromString(read.GetValue()); @@ -756,12 +840,12 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& TopicName = Config.GetTopicName(); TopicPath = Config.GetTopicPath(); IsLocalDC = Config.GetLocalDC(); - auto& pqConfig = AppData(ctx)->PQConfig; - TopicConverterFactory = std::make_shared<NPersQueue::TTopicNamesConverterFactory>( - pqConfig, "", IsLocalDC - ); - TopicConverter = TopicConverterFactory->MakeTopicConverter(Config); - Y_VERIFY(TopicConverter->IsValid(), "%s", TopicConverter->GetReason().c_str()); + + CreateTopicConverter(Config, + TopicConverterFactory, + TopicConverter, + ctx); + KeySchema.clear(); KeySchema.reserve(Config.PartitionKeySchemaSize()); for (const auto& component : Config.GetPartitionKeySchema()) { @@ -782,15 +866,19 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& Y_FAIL("Unexpected config read status: %d", read.GetStatus()); } + THashMap<ui32, TVector<TTransaction>> partitionTxs; + InitTransactions(readRange, partitionTxs); + for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config const auto partitionId = partition.GetPartitionId(); Partitions.emplace(partitionId, TPartitionInfo( - ctx.Register(CreatePartitionActor(partitionId, Config, false, ctx)), + ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)), GetPartitionKeyRange(partition), false, *Counters )); } + ConfigInited = true; InitializeMeteringSink(ctx); @@ -812,7 +900,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& HasDataRequests.clear(); if (Partitions.empty()) { - SignalTabletActive(ctx); + OnInitComplete(ctx); } } @@ -1093,7 +1181,6 @@ void TPersQueue::Handle(TEvPQ::TEvTabletCacheCounters::TPtr& ev, const TActorCon << "Counters. CacheSize " << CacheCounters.CacheSizeBytes << " CachedBlobs " << CacheCounters.CacheSizeBlobs); } - void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& ctx) { auto it = Partitions.find(ev->Get()->Partition); @@ -1104,7 +1191,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c Y_VERIFY(ConfigInited);//partitions are inited only after config if (PartitionsInited == Partitions.size()) { - SignalTabletActive(ctx); + OnInitComplete(ctx); } if (NewConfigShouldBeApplied && PartitionsInited == Partitions.size()) { @@ -1189,6 +1276,21 @@ void TPersQueue::TrySendUpdateConfigResponses(const TActorContext& ctx) ChangeConfigNotification.clear(); } + +void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config, + NPersQueue::TConverterFactoryPtr& converterFactory, + NPersQueue::TTopicConverterPtr& topicConverter, + const TActorContext& ctx) +{ + auto& pqConfig = AppData(ctx)->PQConfig; + converterFactory = + std::make_shared<NPersQueue::TTopicNamesConverterFactory>(pqConfig, + "", + config.GetLocalDC()); + topicConverter = converterFactory->MakeTopicConverter(config); + Y_VERIFY(topicConverter); + Y_VERIFY(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str()); +} void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx) { @@ -1739,7 +1841,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p uncompressedSize = 0; LOG_DEBUG_S( ctx, NKikimrServices::PERSQUEUE, - "got client PART message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition() + "Tablet " << TabletID() << + " got client PART message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition() << " SourceId: \'" << EscapeC(msgs.back().SourceId) << "\' SeqNo: " << msgs.back().SeqNo << " partNo : " << msgs.back().PartNo << " messageNo: " << req.GetMessageNo() << " size: " << data.size() @@ -1755,7 +1858,9 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p cmd.GetExternalOperation(), cmd.GetIgnoreQuotaDeadline() }); } - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "got client message topic: " << TopicConverter->GetClientsideName() << + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Tablet " << TabletID() << + " got client message topic: " << TopicConverter->GetClientsideName() << " partition: " << req.GetPartition() << " SourceId: \'" << EscapeC(msgs.back().SourceId) << "\' SeqNo: " << msgs.back().SeqNo << " partNo : " << msgs.back().PartNo << @@ -2040,7 +2145,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ui32 partition = req.GetPartition(); auto it = Partitions.find(partition); - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "got client message batch for topic " << TopicConverter->GetClientsideName() << " partition " << partition << "\n"); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic " << TopicConverter->GetClientsideName() << " partition " << partition << "\n"); if (it == Partitions.end()) { ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::WRONG_PARTITION_NUMBER, @@ -2144,25 +2249,37 @@ void TPersQueue::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActo if (PipeClientCache->OnConnect(ev)) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, - "Connected to tablet " << ev->Get()->TabletId << " from tablet " << TabletID()); - } else { - if (ev->Get()->Dead) { - //AckRSToDeletedTablet(ev->Get()->TabletId, ctx); - } else { - LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE, - "Failed to connect to tablet " << ev->Get()->TabletId << " from tablet " << TabletID()); - //RestartPipeRS(ev->Get()->TabletId, ctx); - } + "Tablet " << TabletID() << + " Connected to tablet " << ev->Get()->TabletId); + return; } + + RestartPipe(ev->Get()->TabletId, ctx); } void TPersQueue::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, - "Client pipe to tablet " << ev->Get()->TabletId << " from " << TabletID() << " is reset"); + "Tablet " << TabletID() << + " Client pipe to tablet " << ev->Get()->TabletId << " is reset"); PipeClientCache->OnDisconnect(ev); - //RestartPipeRS(ev->Get()->TabletId, ctx); + + RestartPipe(ev->Get()->TabletId, ctx); +} + +void TPersQueue::RestartPipe(ui64 tabletId, const TActorContext& ctx) +{ + for (auto& txId: GetBindedTxs(tabletId)) { + auto* tx = GetTransaction(ctx, txId); + if (!tx) { + continue; + } + + for (auto& message : tx->GetBindedMsgs(tabletId)) { + PipeClientCache->Send(ctx, tabletId, message.Type, message.Data); + } + } } bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) @@ -2173,7 +2290,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc if (ev->Get()->Cgi().Has("kv")) { return TKeyValueFlat::OnRenderAppHtmlPage(ev, ctx); } - LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvRemoteHttpInfo: " << ev->Get()->Query); + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvRemoteHttpInfo: " << ev->Get()->Query); TMap<ui32, TActorId> res; for (auto& p : Partitions) { res.insert({p.first, p.second.Actor}); @@ -2244,12 +2361,22 @@ void TPersQueue::Handle(TEvInterconnect::TEvNodeInfo::TPtr& ev, const TActorCont { Y_VERIFY(ev->Get()->Node); DCId = ev->Get()->Node->Location.GetDataCenterId(); - ResourceMetrics = Executor()->GetResourceMetrics(); + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); request->Record.SetCookie(READ_CONFIG_COOKIE); + request->Record.AddCmdRead()->SetKey(KeyConfig()); request->Record.AddCmdRead()->SetKey(KeyState()); + request->Record.AddCmdRead()->SetKey(KeyTxInfo()); + + auto cmd = request->Record.AddCmdReadRange(); + cmd->MutableRange()->SetFrom(GetTxKey(Min<ui64>())); + cmd->MutableRange()->SetIncludeFrom(true); + cmd->MutableRange()->SetTo(GetTxKey(Max<ui64>())); + cmd->MutableRange()->SetIncludeTo(true); + cmd->SetIncludeData(true); + request->Record.MutableCmdSetExecutorFastLogPolicy() ->SetIsAllowed(AppData(ctx)->PQConfig.GetTactic() == NKikimrClient::TKeyValueRequest::MIN_LATENCY); ctx.Send(ctx.SelfID, request.Release()); @@ -2272,7 +2399,7 @@ void TPersQueue::HandleWakeup(const TActorContext& ctx) { void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Handle TEvPersQueue::TEvProposeTransaction"); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvProposeTransaction"); NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record; switch (event.GetTxBodyCase()) { @@ -2283,7 +2410,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc HandleConfigTransaction(ev->Release(), ctx); break; case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET: - SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()), + SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()), event.GetTxId(), ctx); break; @@ -2305,7 +2432,8 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact bool isWriteOperation = !operation.HasBegin(); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, - "tx=" << event.GetTxId() << + "Tablet " << TabletID() << + " tx=" << event.GetTxId() << ", lock_tx_id=" << txBody.GetLockTxId() << ", path=" << operation.GetPath() << ", partition=" << operation.GetPartitionId() << @@ -2316,7 +2444,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact } if (TabletState != NKikimrPQ::ENormal) { - SendProposeTransactionAbort(ActorIdFromProto(event.GetSource()), + SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()), event.GetTxId(), ctx); return; @@ -2384,6 +2512,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte if (tx->State == NKikimrPQ::TTransaction::WAIT_RS) { CheckTxState(ctx, *tx); + + TryWriteTxs(ctx); } } else if (ack) { // @@ -2404,9 +2534,12 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorCo } tx->OnReadSetAck(event); + tx->UnbindMsgsFromPipe(event.GetTabletConsumer()); if (tx->State == NKikimrPQ::TTransaction::EXECUTED) { CheckTxState(ctx, *tx); + + TryWriteTxs(ctx); } } @@ -2435,6 +2568,8 @@ void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const return; } + Y_VERIFY(tx->State == NKikimrPQ::TTransaction::CALCULATING); + tx->OnProposePartitionConfigResult(event); CheckTxState(ctx, *tx); @@ -2451,6 +2586,8 @@ void TPersQueue::Handle(TEvPQ::TEvTxCommitDone::TPtr& ev, const TActorContext& c return; } + Y_VERIFY(tx->State == NKikimrPQ::TTransaction::EXECUTING); + tx->OnTxCommitDone(event); CheckTxState(ctx, *tx); @@ -2479,6 +2616,8 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx) WriteTxsInProgress = true; ctx.Send(ctx.SelfID, request.Release()); + + TryReturnTabletStateAll(ctx); } void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp, @@ -2505,7 +2644,6 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp, } SendReplies(ctx); - TryReturnTabletStateAll(ctx); CheckChangedTxStates(ctx); WriteTxsInProgress = false; @@ -2529,13 +2667,23 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) const NKikimrPQ::TEvProposeTransaction& event = front->Record; TDistributedTransaction& tx = Txs[event.GetTxId()]; - if (tx.State != NKikimrPQ::TTransaction::UNKNOWN) { - continue; + switch (tx.State) { + case NKikimrPQ::TTransaction::UNKNOWN: { + tx.OnProposeTransaction(event, GetAllowedStep()); + CheckTxState(ctx, tx); + break; + } + case NKikimrPQ::TTransaction::PREPARING: { + break; + } + case NKikimrPQ::TTransaction::PREPARED: { + ScheduleProposeTransactionResult(tx); + break; + } + default: { + Y_FAIL(); + } } - - tx.OnProposeTransaction(event, GetAllowedStep()); - - CheckTxState(ctx, tx); } } @@ -2563,11 +2711,11 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) txAcks[ActorIdFromProto(tx.GetAckTo())].push_back(tx.GetTxId()); } - if (step > LastStep) { - ui64 lastTxId = 0; + if (step >= LastStep) { + ui64 lastPlannedTxId = 0; for (ui64 txId : txIds) { - Y_VERIFY(lastTxId < txId); + Y_VERIFY(lastPlannedTxId < txId); if (auto p = Txs.find(txId); p != Txs.end()) { TDistributedTransaction& tx = p->second; @@ -2581,27 +2729,31 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) TxQueue.emplace(step, txId); } else { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, - "Transaction already planned for step " << tx.Step << - ". TabletId: " << TabletID() << + "Tablet " << TabletID() << + " Transaction already planned for step " << tx.Step << ", Step: " << step << ", TxId: " << txId); + + if (tx.State > NKikimrPQ::TTransaction::PLANNING) { + SendEvProposeTransactionResult(ctx, tx); + } } } else { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, - "Unknown transaction " << txId << - ". TabletId: " << TabletID() << + "Tablet " << TabletID() << + " Unknown transaction " << txId << ", Step: " << step); } - LastTxId = txId; + lastPlannedTxId = txId; } LastStep = step; - LastTxId = lastTxId; + LastTxId = lastPlannedTxId; } else { LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE, - "Old plan step " << step << - ". TabletId: " << TabletID() << + "Tablet " << TabletID() << + " Old plan step " << step << ", LastStep: " << LastStep); } @@ -2686,27 +2838,31 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction& event->Record.SetMinStep(tx.MinStep); event->Record.SetMaxStep(tx.MaxStep); - Replies.emplace_back(tx.Source, std::move(event)); + if (tx.SourceTablet != InvalidTabletId) { + RepliesToPipe.emplace_back(tx.SourceTablet, tx.TxId, std::move(event)); + } else { + RepliesToActor.emplace_back(tx.SourceActor, std::move(event)); + } } void TPersQueue::SchedulePlanStepAck(ui64 step, const THashMap<TActorId, TVector<ui64>>& txAcks) { - for (auto& [target, txIds] : txAcks) { + for (auto& [actorId, txIds] : txAcks) { auto event = std::make_unique<TEvTxProcessing::TEvPlanStepAck>(TabletID(), step, txIds.begin(), txIds.end()); - Replies.emplace_back(target, event.release()); + RepliesToActor.emplace_back(actorId, std::move(event)); } } -void TPersQueue::SchedulePlanStepAccepted(const TActorId& target, +void TPersQueue::SchedulePlanStepAccepted(const TActorId& actorId, ui64 step) { auto event = std::make_unique<TEvTxProcessing::TEvPlanStepAccepted>(TabletID(), step); - Replies.emplace_back(target, event.release()); + RepliesToActor.emplace_back(actorId, std::move(event)); } void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, @@ -2726,7 +2882,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx, TabletID(), body, 0); - PipeClientCache->Send(ctx, receiverId, event.release()); + SendToPipe(receiverId, tx, std::move(event), ctx); } tx.ReadSetAcks.clear(); @@ -2800,7 +2956,7 @@ void TPersQueue::SendEvTxRollbackToPartitions(const TActorContext& ctx, } void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx, - const TDistributedTransaction& tx) + TDistributedTransaction& tx) { auto result = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>(); auto status = @@ -2811,7 +2967,46 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx, result->Record.SetTxId(tx.TxId); result->Record.SetStep(tx.Step); - ctx.Send(tx.Source, result.release()); + if (tx.SourceTablet != InvalidTabletId) { + SendToPipe(tx.SourceTablet, tx, std::move(result), ctx); + } else { + ctx.Send(tx.SourceActor, std::move(result)); + } +} + +void TPersQueue::SendToPipe(ui64 tabletId, + TDistributedTransaction& tx, + std::unique_ptr<IEventBase> event, + const TActorContext& ctx) +{ + Y_VERIFY(event); + + BindTxToPipe(tabletId, tx.TxId); + tx.BindMsgToPipe(tabletId, *event); + PipeClientCache->Send(ctx, tabletId, event.release()); +} + +void TPersQueue::BindTxToPipe(ui64 tabletId, ui64 txId) +{ + BindedTxs[tabletId].insert(txId); +} + +void TPersQueue::UnbindTxFromPipe(ui64 tabletId, ui64 txId) +{ + if (auto p = BindedTxs.find(tabletId); p != BindedTxs.end()) { + p->second.erase(txId); + } +} + +const THashSet<ui64>& TPersQueue::GetBindedTxs(ui64 tabletId) +{ + if (auto p = BindedTxs.find(tabletId); p != BindedTxs.end()) { + return p->second; + } + + static THashSet<ui64> empty; + + return empty; } TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx, @@ -2820,8 +3015,8 @@ TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx, auto p = Txs.find(txId); if (p == Txs.end()) { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, - "Unknown transaction " << txId << - ". TabletId: " << TabletID()); + "Tablet " << TabletID() << + " Unknown transaction " << txId); return nullptr; } return &p->second; @@ -2882,10 +3077,18 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, case NKikimrPQ::TTransaction::KIND_DATA: SendEvTxCalcPredicateToPartitions(ctx, tx); break; - case NKikimrPQ::TTransaction::KIND_CONFIG: - CreateNewPartitions(tx.TabletConfig, ctx); + case NKikimrPQ::TTransaction::KIND_CONFIG: { + NPersQueue::TConverterFactoryPtr converterFactory; + CreateTopicConverter(tx.TabletConfig, + converterFactory, + tx.TopicConverter, + ctx); + CreateNewPartitions(tx.TabletConfig, + tx.TopicConverter, + ctx); SendEvProposePartitionConfig(ctx, tx); break; + } case NKikimrPQ::TTransaction::KIND_UNKNOWN: Y_VERIFY(false); } @@ -2908,7 +3111,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, tx.State = NKikimrPQ::TTransaction::CALCULATED; break; case NKikimrPQ::TTransaction::KIND_CONFIG: + SendEvReadSetToReceivers(ctx, tx); + tx.State = NKikimrPQ::TTransaction::WAIT_RS; + CheckTxState(ctx, tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: @@ -2928,7 +3134,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, [[fallthrough]]; case NKikimrPQ::TTransaction::WAIT_RS: - Y_VERIFY(tx.ReadSetAcks.size() <= tx.Senders.size()); + Y_VERIFY(tx.ReadSetAcks.size() <= tx.Receivers.size()); if (tx.HaveParticipantsDecision()) { SendEvProposeTransactionResult(ctx, tx); @@ -2969,12 +3175,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, tx.State = NKikimrPQ::TTransaction::EXECUTED; TxQueue.pop(); - if (!TxQueue.empty()) { - auto next = GetTransaction(ctx, TxQueue.front().second); - Y_VERIFY(next); - - CheckTxState(ctx, *next); - } + TryStartTransaction(ctx); } else { break; } @@ -3006,10 +3207,26 @@ void TPersQueue::DeleteTx(TDistributedTransaction& tx) void TPersQueue::SendReplies(const TActorContext& ctx) { - for (auto& [actorId, event] : Replies) { + SendRepliesToActors(ctx); + SendRepliesToPipes(ctx); +} + +void TPersQueue::SendRepliesToActors(const TActorContext& ctx) +{ + for (auto& [actorId, event] : RepliesToActor) { ctx.Send(actorId, event.release()); } - Replies.clear(); + RepliesToActor.clear(); +} + +void TPersQueue::SendRepliesToPipes(const TActorContext& ctx) +{ + for (auto& [tabletId, txId, event] : RepliesToPipe) { + auto* tx = GetTransaction(ctx, txId); + Y_VERIFY(tx); + SendToPipe(tabletId, *tx, std::move(event), ctx); + } + RepliesToPipe.clear(); } void TPersQueue::CheckChangedTxStates(const TActorContext& ctx) @@ -3047,7 +3264,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, for (auto& [_, partition] : Partitions) { auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId); - event->TopicConverter = TopicConverter; + event->TopicConverter = tx.TopicConverter; event->Config = tx.TabletConfig; ctx.Send(partition.Actor, std::move(event)); @@ -3058,6 +3275,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, } TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId, + const NPersQueue::TTopicConverterPtr topicConverter, const NKikimrPQ::TPQTabletConfig& config, bool newPartition, const TActorContext& ctx) @@ -3066,7 +3284,7 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId, partitionId, ctx.SelfID, CacheActor, - TopicConverter, + topicConverter, IsLocalDC, DCId, IsServerless, @@ -3077,6 +3295,7 @@ TPartition* TPersQueue::CreatePartitionActor(ui32 partitionId, } void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, + NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx) { EnsurePartitionsAreNotDeleted(config); @@ -3095,7 +3314,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, continue; } - TActorId actorId = ctx.Register(CreatePartitionActor(partitionId, config, true, ctx)); + TActorId actorId = ctx.Register(CreatePartitionActor(partitionId, topicConverter, config, true, ctx)); Partitions.emplace(std::piecewise_construct, std::forward_as_tuple(partitionId), @@ -3126,6 +3345,55 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId()); } } + +void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, + THashMap<ui32, TVector<TTransaction>>& partitionTxs) +{ + Txs.clear(); + TxQueue.clear(); + + std::deque<std::pair<ui64, ui64>> plannedTxs; + + for (size_t i = 0; i < readRange.PairSize(); ++i) { + auto& pair = readRange.GetPair(i); + + NKikimrPQ::TTransaction tx; + Y_VERIFY(tx.ParseFromString(pair.GetValue())); + + Txs.emplace(tx.GetTxId(), tx); + + if (tx.HasStep()) { + if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(LastStep, LastTxId)) { + plannedTxs.emplace_back(tx.GetStep(), tx.GetTxId()); + } + } + } + + std::sort(plannedTxs.begin(), plannedTxs.end()); + for (auto& item : plannedTxs) { + TxQueue.push(item); + } + + Y_UNUSED(partitionTxs); +} + +void TPersQueue::TryStartTransaction(const TActorContext& ctx) +{ + if (TxQueue.empty()) { + return; + } + + auto next = GetTransaction(ctx, TxQueue.front().second); + Y_VERIFY(next); + + CheckTxState(ctx, *next); +} + +void TPersQueue::OnInitComplete(const TActorContext& ctx) +{ + SignalTabletActive(ctx); + TryStartTransaction(ctx); +} ui64 TPersQueue::GetAllowedStep() const { diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 569121c7f90..ac21b3834e9 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -22,6 +22,8 @@ struct TChangeNotification; class TResponseBuilder; class TPartition; +struct TTransaction; + //USES MAIN chanel for big blobs, INLINE or EXTRA for ZK-like load, EXTRA2 for small blob for logging (VDISK of type LOG is ok with EXTRA2) class TPersQueue : public NKeyValue::TKeyValueFlat { @@ -99,7 +101,11 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { const TActorContext& ctx); void HandleStateWriteResponse(const NKikimrClient::TResponse& resp, const TActorContext& ctx); - void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx); + void ReadTxInfo(const NKikimrClient::TKeyValueResponse::TReadResult& read, + const TActorContext& ctx); + void ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& read, + const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, + const TActorContext& ctx); void ReadState(const NKikimrClient::TKeyValueResponse::TReadResult& read, const TActorContext& ctx); void InitializeMeteringSink(const TActorContext& ctx); @@ -113,6 +119,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { NPersQueue::NErrorCode::EErrorCode& code, TString& error) const; void TrySendUpdateConfigResponses(const TActorContext& ctx); + static void CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config, + NPersQueue::TConverterFactoryPtr& converterFactory, + NPersQueue::TTopicConverterPtr& topicConverter, + const TActorContext& ctx); //client request void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx); @@ -227,7 +237,12 @@ private: TMaybe<NKikimrPQ::TPQTabletConfig> TabletConfigTx; TMaybe<NKikimrPQ::TBootstrapConfig> BootstrapConfigTx; bool WriteTxsInProgress = false; - TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies; + + struct TReplyToActor; + struct TReplyToPipe; + + TVector<TReplyToActor> RepliesToActor; + TVector<TReplyToPipe> RepliesToPipe; TIntrusivePtr<NTabletPipe::TBoundedClientCacheConfig> PipeClientCacheConfig; THolder<NTabletPipe::IClientCache> PipeClientCache; @@ -262,7 +277,7 @@ private: void SendEvTxRollbackToPartitions(const TActorContext& ctx, TDistributedTransaction& tx); void SendEvProposeTransactionResult(const TActorContext& ctx, - const TDistributedTransaction& tx); + TDistributedTransaction& tx); TDistributedTransaction* GetTransaction(const TActorContext& ctx, ui64 txId); @@ -274,6 +289,8 @@ private: void DeleteTx(TDistributedTransaction& tx); void SendReplies(const TActorContext& ctx); + void SendRepliesToActors(const TActorContext& ctx); + void SendRepliesToPipes(const TActorContext& ctx); void CheckChangedTxStates(const TActorContext& ctx); bool AllTransactionsHaveBeenProcessed() const; @@ -296,10 +313,12 @@ private: TDistributedTransaction& tx); TPartition* CreatePartitionActor(ui32 partitionId, + const NPersQueue::TTopicConverterPtr topicConverter, const NKikimrPQ::TPQTabletConfig& config, bool newPartition, const TActorContext& ctx); void CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config, + NPersQueue::TTopicConverterPtr topicConverter, const TActorContext& ctx); void EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const; @@ -314,6 +333,24 @@ private: const TActorContext& ctx); void ClearNewConfig(); + + void SendToPipe(ui64 tabletId, + TDistributedTransaction& tx, + std::unique_ptr<IEventBase> event, + const TActorContext& ctx); + + void InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange, + THashMap<ui32, TVector<TTransaction>>& partitionTxs); + void TryStartTransaction(const TActorContext& ctx); + void OnInitComplete(const TActorContext& ctx); + + void RestartPipe(ui64 tabletId, const TActorContext& ctx); + + void BindTxToPipe(ui64 tabletId, ui64 txId); + void UnbindTxFromPipe(ui64 tabletId, ui64 txId); + const THashSet<ui64>& GetBindedTxs(ui64 tabletId); + + THashMap<ui64, THashSet<ui64>> BindedTxs; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 1e7bb541152..80a428f650a 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -1,32 +1,134 @@ #include "transaction.h" namespace NKikimr::NPQ { + +TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& tx) : + TDistributedTransaction() +{ + Kind = tx.GetKind(); + if (tx.HasStep()) { + Step = tx.GetStep(); + } + TxId = tx.GetTxId(); + State = tx.GetState(); + MinStep = tx.GetMinStep(); + MaxStep = tx.GetMaxStep(); + + switch (Kind) { + case NKikimrPQ::TTransaction::KIND_DATA: + InitDataTransaction(tx); + break; + case NKikimrPQ::TTransaction::KIND_CONFIG: + InitConfigTransaction(tx); + break; + case NKikimrPQ::TTransaction::KIND_UNKNOWN: + Y_FAIL_S("unknown transaction type"); + } + + if (tx.HasSelfPredicate()) { + SelfDecision = + tx.GetSelfPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; + } + if (tx.HasAggrPredicate()) { + ParticipantsDecision = + tx.GetAggrPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; + } + + if (tx.HasTablet()) { + SourceTablet = tx.GetTablet(); + } else { + Y_VERIFY(tx.HasActor()); + SourceActor = ActorIdFromProto(tx.GetActor()); + } +} + +void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx) +{ + for (ui64 tabletId : tx.GetSenders()) { + Senders.insert(tabletId); + } + for (ui64 tabletId : tx.GetReceivers()) { + Receivers.insert(tabletId); + } + + InitPartitions(tx.GetOperations()); +} + +void TDistributedTransaction::InitPartitions(const google::protobuf::RepeatedPtrField<NKikimrPQ::TPartitionOperation>& operations) +{ + Partitions.clear(); + + for (auto& o : operations) { + Operations.push_back(o); + Partitions.insert(o.GetPartitionId()); + } +} + +void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransaction& tx) +{ + Y_VERIFY(tx.HasSchemeShardId()); + + Receivers.insert(tx.GetSchemeShardId()); + + TabletConfig = tx.GetTabletConfig(); + BootstrapConfig = tx.GetBootstrapConfig(); + + InitPartitions(TabletConfig); +} + +void TDistributedTransaction::InitPartitions(const NKikimrPQ::TPQTabletConfig& config) +{ + Partitions.clear(); + + if (config.PartitionsSize()) { + for (const auto& partition : config.GetPartitions()) { + Partitions.insert(partition.GetPartitionId()); + } + } else { + for (auto partitionId : config.GetPartitionIds()) { + Partitions.insert(partitionId); + } + } +} void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event, ui64 minStep) { Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET); + Y_VERIFY(event.GetSourceCase() != NKikimrPQ::TEvProposeTransaction::SOURCE_NOT_SET); Y_VERIFY(TxId == Max<ui64>()); TxId = event.GetTxId(); MinStep = minStep; - MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds(); switch (event.GetTxBodyCase()) { case NKikimrPQ::TEvProposeTransaction::kData: Y_VERIFY(event.HasData()); + MaxStep = MinStep + TDuration::Seconds(30).MilliSeconds(); OnProposeTransaction(event.GetData()); break; case NKikimrPQ::TEvProposeTransaction::kConfig: Y_VERIFY(event.HasConfig()); + MaxStep = Max<ui64>(); OnProposeTransaction(event.GetConfig()); break; - case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET: - break; + default: + Y_FAIL_S("unknown TxBody case"); } - Source = ActorIdFromProto(event.GetSource()); + switch (event.GetSourceCase()) { + case NKikimrPQ::TEvProposeTransaction::kActor: + Y_VERIFY(event.HasActor()); + SourceActor = ActorIdFromProto(event.GetActor()); + break; + case NKikimrPQ::TEvProposeTransaction::kTablet: + Y_VERIFY(event.HasTablet()); + SourceTablet = event.GetTablet(); + break; + default: + Y_FAIL_S("unknown Source case"); + } } void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody) @@ -41,10 +143,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac Receivers.insert(tablet); } - for (auto& operation : txBody.GetOperations()) { - Operations.push_back(operation); - Partitions.insert(operation.GetPartitionId()); - } + InitPartitions(txBody.GetOperations()); PartitionRepliesCount = 0; PartitionRepliesExpected = 0; @@ -54,20 +153,16 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody) { + Y_VERIFY(txBody.HasSchemeShardId()); + Kind = NKikimrPQ::TTransaction::KIND_CONFIG; + Receivers.insert(txBody.GetSchemeShardId()); + TabletConfig = txBody.GetTabletConfig(); BootstrapConfig = txBody.GetBootstrapConfig(); - if (TabletConfig.PartitionsSize()) { - for (const auto& partition : TabletConfig.GetPartitions()) { - Partitions.insert(partition.GetPartitionId()); - } - } else { - for (auto partitionId : TabletConfig.GetPartitionIds()) { - Partitions.insert(partitionId); - } - } + InitPartitions(TabletConfig); PartitionRepliesCount = 0; PartitionRepliesExpected = 0; @@ -85,22 +180,25 @@ void TDistributedTransaction::OnPlanStep(ui64 step) void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPredicateResult& event) { - Y_VERIFY(Step == event.Step); - Y_VERIFY(TxId == event.TxId); - - Y_VERIFY(Partitions.contains(event.Partition)); - - SetDecision(event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT); - - ++PartitionRepliesCount; + OnPartitionResult(event, + event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT); } void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event) { + OnPartitionResult(event, + NKikimrTx::TReadSetData::DECISION_COMMIT); +} + +template<class E> +void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision) +{ Y_VERIFY(Step == event.Step); Y_VERIFY(TxId == event.TxId); - SetDecision(NKikimrTx::TReadSetData::DECISION_COMMIT); + Y_VERIFY(Partitions.contains(event.Partition)); + + SetDecision(decision); ++PartitionRepliesCount; } @@ -206,7 +304,14 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque AddCmdWriteConfigTx(tx); break; case NKikimrPQ::TTransaction::KIND_UNKNOWN: - break; + Y_FAIL_S("unknown transaction type"); + } + + if (SourceTablet != Max<ui64>()) { + tx.SetTablet(SourceTablet); + } else { + Y_VERIFY(SourceActor != TActorId()); + ActorIdToProto(SourceActor, tx.MutableActor()); } TString value; @@ -236,6 +341,10 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx) void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx) { + Y_VERIFY(Receivers.size() == 1); + + tx.SetSchemeShardId(*Receivers.begin()); + *tx.MutableTabletConfig() = TabletConfig; *tx.MutableBootstrapConfig() = BootstrapConfig; } @@ -259,7 +368,33 @@ void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision& va TString TDistributedTransaction::GetKey() const { - return Sprintf("tx_%lu", TxId); + return GetTxKey(TxId); +} + +void TDistributedTransaction::BindMsgToPipe(ui64 tabletId, const IEventBase& event) +{ + Y_VERIFY(event.IsSerializable()); + + TAllocChunkSerializer serializer; + Y_VERIFY(event.SerializeToArcadiaStream(&serializer)); + auto data = serializer.Release(event.CreateSerializationInfo()); + OutputMsgs[tabletId].emplace_back(event.Type(), std::move(data)); +} + +void TDistributedTransaction::UnbindMsgsFromPipe(ui64 tabletId) +{ + OutputMsgs.erase(tabletId); +} + +auto TDistributedTransaction::GetBindedMsgs(ui64 tabletId) -> const TVector<TSerializedMessage>& +{ + if (auto p = OutputMsgs.find(tabletId); p != OutputMsgs.end()) { + return p->second; + } + + static TVector<TSerializedMessage> empty; + + return empty; } } diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index cbe3d787aaa..ab58a4b7a5f 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -17,6 +17,7 @@ namespace NKikimr::NPQ { struct TDistributedTransaction { TDistributedTransaction() = default; + explicit TDistributedTransaction(const NKikimrPQ::TTransaction& tx); void OnProposeTransaction(const NKikimrPQ::TEvProposeTransaction& event, ui64 minStep); @@ -47,7 +48,8 @@ struct TDistributedTransaction { EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; - NActors::TActorId Source; // отправитель TEvProposeTransaction + NActors::TActorId SourceActor; // отправитель TEvProposeTransaction + ui64 SourceTablet = Max<ui64>(); THashSet<ui32> Partitions; // список участвующих партиций size_t PartitionRepliesCount = 0; @@ -59,6 +61,7 @@ struct TDistributedTransaction { NKikimrPQ::TPQTabletConfig TabletConfig; NKikimrPQ::TBootstrapConfig BootstrapConfig; + NPersQueue::TTopicConverterPtr TopicConverter; bool WriteInProgress = false; @@ -79,6 +82,32 @@ struct TDistributedTransaction { void AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx); void AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx); + + void InitDataTransaction(const NKikimrPQ::TTransaction& tx); + void InitConfigTransaction(const NKikimrPQ::TTransaction& tx); + + void InitPartitions(const google::protobuf::RepeatedPtrField<NKikimrPQ::TPartitionOperation>& tx); + void InitPartitions(const NKikimrPQ::TPQTabletConfig& config); + + template<class E> + void OnPartitionResult(const E& event, EDecision decision); + + struct TSerializedMessage { + ui32 Type; + TIntrusivePtr<TEventSerializedData> Data; + + TSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data) : + Type(type), + Data(data) + { + } + }; + + THashMap<ui64, TVector<TSerializedMessage>> OutputMsgs; + + void BindMsgToPipe(ui64 tabletId, const IEventBase& event); + void UnbindMsgsFromPipe(ui64 tabletId); + const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId); }; } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 62b89f4d0b7..16df7691622 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -731,7 +731,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition, { auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); - ActorIdToProto(Ctx->Edge, event->Record.MutableSource()); + ActorIdToProto(Ctx->Edge, event->Record.MutableActor()); auto* body = event->Record.MutableData(); auto* operation = body->MutableOperations()->Add(); operation->SetPartitionId(partition); diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index bcfa18cf72a..1013ccfd26f 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -38,6 +38,7 @@ struct TTxOperation { struct TConfigParams { TMaybe<NKikimrPQ::TPQTabletConfig> Tablet; TMaybe<NKikimrPQ::TBootstrapConfig> Bootstrap; + ui64 SchemeShardId = 0; }; struct TProposeTransactionParams { @@ -215,7 +216,7 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); THashSet<ui32> partitions; - ActorIdToProto(Ctx->Edge, event->Record.MutableSource()); + ActorIdToProto(Ctx->Edge, event->Record.MutableActor()); event->Record.SetTxId(params.TxId); if (params.Configs) { @@ -223,6 +224,9 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa // TxBody.Config // auto* body = event->Record.MutableConfig(); + if (params.Configs->SchemeShardId) { + body->SetSchemeShardId(params.Configs->SchemeShardId); + } if (params.Configs->Tablet.Defined()) { *body->MutableTabletConfig() = *params.Configs->Tablet; } @@ -746,6 +750,7 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture) Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) { + NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222); PQTabletPrepare({.partitions=2}, {}, *Ctx); const ui64 txId = 67890; @@ -759,7 +764,8 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) SendProposeTransactionRequest({.TxId=txId, .Configs=NHelpers::TConfigParams{ .Tablet=tabletConfig, - .Bootstrap=NHelpers::MakeBootstrapConfig() + .Bootstrap=NHelpers::MakeBootstrapConfig(), + .SchemeShardId = 22222 }}); WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); @@ -769,12 +775,16 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) WaitPlanStepAck({.Step=100, .TxIds={txId}}); WaitPlanStepAccepted({.Step=100}); + WaitReadSet(*schemeshard, {.Step=100, .TxId=txId, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); + schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId}); + WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); } Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) { + NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222); PQTabletPrepare({.partitions=2}, {}, *Ctx); const ui64 txId_2 = 67891; @@ -790,7 +800,8 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) SendProposeTransactionRequest({.TxId=txId_2, .Configs=NHelpers::TConfigParams{ .Tablet=tabletConfig, - .Bootstrap=NHelpers::MakeBootstrapConfig() + .Bootstrap=NHelpers::MakeBootstrapConfig(), + .SchemeShardId = 22222 }}); SendProposeTransactionRequest({.TxId=txId_3, .TxOps={ @@ -808,6 +819,9 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) WaitPlanStepAck({.Step=100, .TxIds={txId_2, txId_3}}); WaitPlanStepAccepted({.Step=100}); + WaitReadSet(*schemeshard, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); + schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId}); + WaitProposeTransactionResponse({.TxId=txId_2, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); WaitProposeTransactionResponse({.TxId=txId_3, diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 9caedbecdca..3dc3dc5bc27 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -826,7 +826,10 @@ message TConfigTransaction { } message TEvProposeTransaction { - optional NActorsProto.TActorId Source = 1; + oneof Source { + NActorsProto.TActorId Actor = 1; + uint64 Tablet = 5; + } optional uint64 TxId = 2; oneof TxBody { TDataTransaction Data = 3; @@ -909,7 +912,7 @@ message TTransaction { repeated uint64 Receivers = 6; repeated TPartitionOperation Operations = 7; optional bool SelfPredicate = 9; // только предикаты партиций. предикаты коллег отдельно - optional bool AggrPredicate = 10; // заполненено одно из полей Senders or AggrPredicate + optional bool AggrPredicate = 10; // // TConfigTransaction @@ -917,6 +920,14 @@ message TTransaction { optional TPQTabletConfig TabletConfig = 12; optional TBootstrapConfig BootstrapConfig = 13; optional uint64 SchemeShardId = 16; + + // + // получатель результата + // + oneof Source { + NActorsProto.TActorId Actor = 14; + uint64 Tablet = 15; + } }; message TTabletTxInfo { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 3da9bafe66f..9b266eb70f0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -605,7 +605,7 @@ bool CollectSchemaChanged(const TOperationId& operationId, const auto& evRecord = ev->Get()->Record; if (evRecord.GetStatus() == NKikimrPQ::TEvProposeTransactionResult::COMPLETE) { const auto ssId = context.SS->SelfTabletId(); - const auto shardId = TTabletId(evRecord.GetOrigin()); + const TTabletId shardId(evRecord.GetOrigin()); const auto shardIdx = context.SS->MustGetShardIdx(shardId); Y_VERIFY(context.SS->ShardInfos.contains(shardIdx)); @@ -649,10 +649,12 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans const TString& databaseId, const TString& databasePath, TTxState::ETxType txType, - TTabletId ssId) + TTabletId ssId, + const TOperationContext& context) { auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); event->Record.SetTxId(ui64(txId)); + event->Record.SetTablet(ui64(ssId)); event->Record.MutableConfig()->SetSchemeShardId(ui64(ssId)); MakePQTabletConfig(*event->Record.MutableConfig()->MutableTabletConfig(), @@ -668,6 +670,10 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans pqGroup, txType); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Propose configure PersQueue" << + ", message: " << event->Record.ShortUtf8DebugString()); + return event; } @@ -680,7 +686,8 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId const TString& folderId, const TString& databaseId, const TString& databasePath, - TTxState::ETxType txType) + TTxState::ETxType txType, + const TOperationContext& context) { auto event = MakeHolder<TEvPersQueue::TEvUpdateConfig>(); event->Record.SetTxId(ui64(txId)); @@ -698,6 +705,10 @@ THolder<TEvPersQueue::TEvUpdateConfig> TConfigureParts::MakeEvUpdateConfig(TTxId pqGroup, txType); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Propose configure PersQueue" << + ", message: " << event->Record.ShortUtf8DebugString()); + return event; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index a0eae5139e6..b9d2b83e5a3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -686,7 +686,8 @@ public: databaseId, databasePath, txState->TxType, - ssId); + ssId, + context); } else { event = MakeEvUpdateConfig(OperationId.GetTxId(), *pqGroup, @@ -697,7 +698,8 @@ public: folderId, databaseId, databasePath, - txState->TxType); + txState->TxType, + context); } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, @@ -849,7 +851,8 @@ private: const TString& databaseId, const TString& databasePath, TTxState::ETxType txType, - TTabletId ssId); + TTabletId ssId, + const TOperationContext& context); static THolder<TEvPersQueue::TEvUpdateConfig> MakeEvUpdateConfig(TTxId txId, const TTopicInfo& pqGroup, @@ -860,7 +863,8 @@ private: const TString& folderId, const TString& databaseId, const TString& databasePath, - TTxState::ETxType txType); + TTxState::ETxType txType, + const TOperationContext& context); }; class TPropose: public TSubOperationState { diff --git a/ydb/core/tx/schemeshard/ut_pq_reboots.cpp b/ydb/core/tx/schemeshard/ut_pq_reboots.cpp index 86278552974..d980a16280b 100644 --- a/ydb/core/tx/schemeshard/ut_pq_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_pq_reboots.cpp @@ -20,7 +20,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(Create) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + t.Runtime->SetScheduledLimit(400); + TestCreatePQGroup(runtime, ++t.TxId, "/MyRoot/DirA", "Name: \"PQGroup_2\"" "TotalGroupCount: 10 " @@ -51,7 +55,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(CreateMultiplePqTablets) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + t.Runtime->SetScheduledLimit(400); + TestCreatePQGroup(runtime, ++t.TxId, "/MyRoot/DirA", "Name: \"PQGroup_2\"" "TotalGroupCount: 2 " @@ -140,7 +148,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(AlterWithReboots) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + t.Runtime->SetScheduledLimit(400); + TPathVersion pqVer; { TInactiveZone inactive(activeZone); @@ -201,7 +213,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(CreateAlter) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + t.Runtime->SetScheduledLimit(400); + t.RestoreLogging(); AsyncCreatePQGroup(runtime, t.TxId++, "/MyRoot/DirA", @@ -250,7 +266,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(CreateDrop) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& /*activeZone*/) { + t.Runtime->SetScheduledLimit(400); + t.RestoreLogging(); TestCreatePQGroup(runtime, t.TxId++, "/MyRoot/DirA", GroupConfig); @@ -284,7 +304,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(CreateDropAbort) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& /*activeZone*/) { + t.Runtime->SetScheduledLimit(400); + t.RestoreLogging(); ui64& txId = t.TxId; @@ -310,7 +334,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { //Handle(): requirement std::make_pair(msg->CollectGeneration, msg->CollectStep) >= barrier.MakeCollectPair() failed /*Y_UNIT_TEST(CreateAlterAlterDrop) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + t.Runtime->SetScheduledLimit(400); + t.RestoreLogging(); ui64& txId = t.TxId; @@ -337,7 +365,11 @@ Y_UNIT_TEST_SUITE(TPqGroupTestReboots) { Y_UNIT_TEST(CreateAlterDropPqGroupWithReboots) { TTestWithReboots t; + t.GetTestEnvOptions().EnablePQConfigTransactionsAtSchemeShard(true); + t.Run([&](TTestActorRuntime& runtime, bool& /*activeZone*/) { + t.Runtime->SetScheduledLimit(400); + using ESts = NKikimrScheme::EStatus; t.RestoreLogging(); |