diff options
author | abcdef <akotov@ydb.tech> | 2023-05-18 09:05:33 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-05-18 09:05:33 +0300 |
commit | f6237db2054554be8c0799a5a39a58c00701a3a8 (patch) | |
tree | 6de9e42969a65568779054c2ec6ffbab8c7beeae | |
parent | 26fb4330a3aaf4e8ee67e0ebf130fedbd8f8bb5b (diff) | |
download | ydb-f6237db2054554be8c0799a5a39a58c00701a3a8.tar.gz |
the PQ tablet sends the TEvProposedTransactionResult response directly
Исправлена ошибка с отправкой `TEvProposeTransactionResult`. В некоторых случаях оно отправлялось через pipe. По соглашению ответные сообщения должны отправляться напрямую.
Для гарантии доставки ответного сообщения при рестартах таблетки отправившей запрос используются `TEvProposeTransactionAttach` и `TEvProposeTransactionAttachResult`.
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/events/global.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 97 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.cpp | 43 | ||||
-rw-r--r-- | ydb/core/persqueue/transaction.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pqtablet_ut.cpp | 16 | ||||
-rw-r--r-- | ydb/core/protos/counters_schemeshard.proto | 2 | ||||
-rw-r--r-- | ydb/core/protos/pqconfig.proto | 12 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_common.cpp | 172 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_common.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_part.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 2 |
17 files changed, 195 insertions, 239 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 1d07df03c15..afac59d7681 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2232,7 +2232,7 @@ private: } transaction.SetImmediate(ImmediateTx); - ActorIdToProto(SelfId(), ev->Record.MutableActor()); + ActorIdToProto(SelfId(), ev->Record.MutableSourceActor()); ev->Record.MutableData()->Swap(&transaction); ev->Record.SetTxId(TxId); diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index 6caf7858267..9b0e97ef120 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -6,6 +6,7 @@ #include <library/cpp/actors/core/actorid.h> #include <ydb/core/base/blobstorage.h> #include <ydb/core/protos/msgbus.pb.h> +#include <ydb/core/tx/datashard/datashard.h> #include <ydb/public/api/protos/draft/persqueue_common.pb.h> namespace NKikimr { @@ -241,5 +242,8 @@ struct TEvPersQueue { struct TEvPeriodicTopicStats : public TEventPB<TEvPeriodicTopicStats, NKikimrPQ::TEvPeriodicTopicStats, EvPeriodicTopicStats> { }; + + using TEvProposeTransactionAttach = TEvDataShard::TEvProposeTransactionAttach; + using TEvProposeTransactionAttachResult = TEvDataShard::TEvProposeTransactionAttachResult; }; } //NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 8e4b8affb6f..7b29a9c202b 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -76,7 +76,7 @@ void TPartition::ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode) { - ctx.Send(ActorIdFromProto(event.GetActor()), + ctx.Send(ActorIdFromProto(event.GetSourceActor()), MakeReplyPropose(event, statusCode).Release()); } @@ -1998,7 +1998,7 @@ void TPartition::ScheduleReplyError(const ui64 dst, void TPartition::ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode) { - Replies.emplace_back(ActorIdFromProto(event.GetActor()), + Replies.emplace_back(ActorIdFromProto(event.GetSourceActor()), MakeReplyPropose(event, statusCode).Release()); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index b285e24565f..1c7611d320b 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -30,8 +30,6 @@ static constexpr ui32 CACHE_SIZE = 100_MB; static constexpr ui32 MAX_BYTES = 25_MB; static constexpr ui32 MAX_SOURCE_ID_LENGTH = 10_KB; -constexpr const auto InvalidTabletId = Max<ui64>(); - struct TPartitionInfo { TPartitionInfo(const TActorId& actor, TMaybe<TPartitionKeyRange>&& keyRange, const bool initDone, const TTabletCountersBase& baseline) @@ -600,21 +598,6 @@ struct TPersQueue::TReplyToActor { TEventBasePtr Event; }; -struct TPersQueue::TReplyToPipe { - using TEventBasePtr = std::unique_ptr<IEventBase>; - - TReplyToPipe(ui64 tabletId, ui64 txId, TEventBasePtr event) : - TabletId(tabletId), - TxId(txId), - Event(std::move(event)) - { - } - - ui64 TabletId; - ui64 TxId; - TEventBasePtr Event; -}; - void TPersQueue::ReplyError(const TActorContext& ctx, const ui64 responseCookie, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) { ReplyPersQueueError( @@ -2412,7 +2395,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc HandleConfigTransaction(ev->Release(), ctx); break; case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET: - SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()), + SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()), event.GetTxId(), ctx); break; @@ -2446,7 +2429,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact } if (TabletState != NKikimrPQ::ENormal) { - SendProposeTransactionAbort(ActorIdFromProto(event.GetActor()), + SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()), event.GetTxId(), ctx); return; @@ -2683,26 +2666,26 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) EvProposeTransactionQueue.pop_front(); const NKikimrPQ::TEvProposeTransaction& event = front->Record; - TDistributedTransaction& tx = Txs[event.GetTxId()]; + switch (tx.State) { - case NKikimrPQ::TTransaction::UNKNOWN: { + case NKikimrPQ::TTransaction::UNKNOWN: tx.OnProposeTransaction(event, GetAllowedStep(), TabletID()); CheckTxState(ctx, tx); break; - } - case NKikimrPQ::TTransaction::PREPARING: { - break; - } - case NKikimrPQ::TTransaction::PREPARED: { + case NKikimrPQ::TTransaction::PREPARING: + case NKikimrPQ::TTransaction::PREPARED: + // + // the sender re-sent the TEvProposeTransaction. the actor ID could have changed. you need to + // update the ID in the transaction and schedule a response to the new actor + // + tx.SourceActor = ActorIdFromProto(event.GetSourceActor()); ScheduleProposeTransactionResult(tx); break; - } - default: { + default: Y_FAIL(); } - } } } @@ -2752,10 +2735,6 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx) " Transaction already planned for step " << tx.Step << ", Step: " << step << ", TxId: " << txId); - - if (tx.State > NKikimrPQ::TTransaction::PLANNING) { - SendEvProposeTransactionResult(ctx, tx); - } } } else { LOG_WARN_S(ctx, NKikimrServices::PERSQUEUE, @@ -2861,11 +2840,7 @@ void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction& event->Record.MutableDomainCoordinators()->CopyFrom(ProcessingParams->GetCoordinators()); } - if (tx.SourceTablet != InvalidTabletId) { - RepliesToPipe.emplace_back(tx.SourceTablet, tx.TxId, std::move(event)); - } else { - RepliesToActor.emplace_back(tx.SourceActor, std::move(event)); - } + RepliesToActor.emplace_back(tx.SourceActor, std::move(event)); } void TPersQueue::SchedulePlanStepAck(ui64 step, @@ -2992,11 +2967,8 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx, result->Record.SetTxId(tx.TxId); result->Record.SetStep(tx.Step); - if (tx.SourceTablet != InvalidTabletId) { - SendToPipe(tx.SourceTablet, tx, std::move(result), ctx); - } else { - ctx.Send(tx.SourceActor, std::move(result)); - } + + ctx.Send(tx.SourceActor, std::move(result)); } void TPersQueue::SendToPipe(ui64 tabletId, @@ -3232,28 +3204,12 @@ void TPersQueue::DeleteTx(TDistributedTransaction& tx) void TPersQueue::SendReplies(const TActorContext& ctx) { - SendRepliesToActors(ctx); - SendRepliesToPipes(ctx); -} - -void TPersQueue::SendRepliesToActors(const TActorContext& ctx) -{ for (auto& [actorId, event] : RepliesToActor) { ctx.Send(actorId, event.release()); } RepliesToActor.clear(); } -void TPersQueue::SendRepliesToPipes(const TActorContext& ctx) -{ - for (auto& [tabletId, txId, event] : RepliesToPipe) { - auto* tx = GetTransaction(ctx, txId); - Y_VERIFY(tx); - SendToPipe(tabletId, *tx, std::move(event), ctx); - } - RepliesToPipe.clear(); -} - void TPersQueue::CheckChangedTxStates(const TActorContext& ctx) { for (ui64 txId : ChangedTxs) { @@ -3460,6 +3416,28 @@ void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext } } +void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx) +{ + const ui64 txId = ev->Get()->Record.GetTxId(); + NKikimrProto::EReplyStatus status = NKikimrProto::NODATA; + + auto tx = GetTransaction(ctx, txId); + if (tx) { + // + // the actor's ID could have changed from the moment he sent the TEvProposeTransaction. you need to + // update the actor ID in the transaction + // + // if the transaction has progressed beyond WAIT_RS, then a response has been sent to the sender + // + tx->SourceActor = ev->Sender; + if (tx->State <= NKikimrPQ::TTransaction::WAIT_RS) { + status = NKikimrProto::OK; + } + } + + ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie); +} + bool TPersQueue::HandleHook(STFUNC_SIG) { SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR); @@ -3497,6 +3475,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG) HFuncTraced(TEvPQ::TEvProposePartitionConfigResult, Handle); HFuncTraced(TEvPQ::TEvTxCommitDone, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); + HFuncTraced(TEvPersQueue::TEvProposeTransactionAttach, Handle); default: return false; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index 912a9c4f680..0836d6261d1 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -239,10 +239,8 @@ private: bool WriteTxsInProgress = false; struct TReplyToActor; - struct TReplyToPipe; TVector<TReplyToActor> RepliesToActor; - TVector<TReplyToPipe> RepliesToPipe; TIntrusivePtr<NTabletPipe::TBoundedClientCacheConfig> PipeClientCacheConfig; THolder<NTabletPipe::IClientCache> PipeClientCache; @@ -289,8 +287,6 @@ private: void DeleteTx(TDistributedTransaction& tx); void SendReplies(const TActorContext& ctx); - void SendRepliesToActors(const TActorContext& ctx); - void SendRepliesToPipes(const TActorContext& ctx); void CheckChangedTxStates(const TActorContext& ctx); bool AllTransactionsHaveBeenProcessed() const; @@ -355,6 +351,8 @@ private: void InitProcessingParams(const TActorContext& ctx); TMaybe<NKikimrSubDomains::TProcessingParams> ProcessingParams; + + void Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr& ev, const TActorContext& ctx); }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 1454c66de13..771f1c52fbf 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -34,12 +34,8 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& tx.GetAggrPredicate() ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT; } - if (tx.HasTablet()) { - SourceTablet = tx.GetTablet(); - } else { - Y_VERIFY(tx.HasActor()); - SourceActor = ActorIdFromProto(tx.GetActor()); - } + Y_VERIFY(tx.HasSourceActor()); + SourceActor = ActorIdFromProto(tx.GetSourceActor()); } void TDistributedTransaction::InitDataTransaction(const NKikimrPQ::TTransaction& tx) @@ -66,10 +62,6 @@ void TDistributedTransaction::InitPartitions(const google::protobuf::RepeatedPtr void TDistributedTransaction::InitConfigTransaction(const NKikimrPQ::TTransaction& tx) { - Y_VERIFY(tx.HasSchemeShardId()); - - Receivers.insert(tx.GetSchemeShardId()); - TabletConfig = tx.GetTabletConfig(); BootstrapConfig = tx.GetBootstrapConfig(); @@ -96,7 +88,6 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr ui64 extractTabletId) { Y_VERIFY(event.GetTxBodyCase() != NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET); - Y_VERIFY(event.GetSourceCase() != NKikimrPQ::TEvProposeTransaction::SOURCE_NOT_SET); Y_VERIFY(TxId == Max<ui64>()); TxId = event.GetTxId(); @@ -118,18 +109,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TEvProposeTr Y_FAIL_S("unknown TxBody case"); } - switch (event.GetSourceCase()) { - case NKikimrPQ::TEvProposeTransaction::kActor: - Y_VERIFY(event.HasActor()); - SourceActor = ActorIdFromProto(event.GetActor()); - break; - case NKikimrPQ::TEvProposeTransaction::kTablet: - Y_VERIFY(event.HasTablet()); - SourceTablet = event.GetTablet(); - break; - default: - Y_FAIL_S("unknown Source case"); - } + Y_VERIFY(event.HasSourceActor()); + SourceActor = ActorIdFromProto(event.GetSourceActor()); } void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransaction& txBody, @@ -159,12 +140,8 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTransaction& txBody) { - Y_VERIFY(txBody.HasSchemeShardId()); - Kind = NKikimrPQ::TTransaction::KIND_CONFIG; - Receivers.insert(txBody.GetSchemeShardId()); - TabletConfig = txBody.GetTabletConfig(); BootstrapConfig = txBody.GetBootstrapConfig(); @@ -313,12 +290,8 @@ void TDistributedTransaction::AddCmdWrite(NKikimrClient::TKeyValueRequest& reque Y_FAIL_S("unknown transaction type"); } - if (SourceTablet != Max<ui64>()) { - tx.SetTablet(SourceTablet); - } else { - Y_VERIFY(SourceActor != TActorId()); - ActorIdToProto(SourceActor, tx.MutableActor()); - } + Y_VERIFY(SourceActor != TActorId()); + ActorIdToProto(SourceActor, tx.MutableSourceActor()); TString value; Y_VERIFY(tx.SerializeToString(&value)); @@ -347,10 +320,6 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx) void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx) { - Y_VERIFY(Receivers.size() == 1); - - tx.SetSchemeShardId(*Receivers.begin()); - *tx.MutableTabletConfig() = TabletConfig; *tx.MutableBootstrapConfig() = BootstrapConfig; } diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 8e08f29ad46..5ffeb1704b0 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -51,7 +51,6 @@ struct TDistributedTransaction { EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; NActors::TActorId SourceActor; // отправитель TEvProposeTransaction - ui64 SourceTablet = Max<ui64>(); THashSet<ui32> Partitions; // список участвующих партиций size_t PartitionRepliesCount = 0; diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 16df7691622..870c70d8f2a 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -731,7 +731,7 @@ void TPartitionFixture::SendProposeTransactionRequest(ui32 partition, { auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); - ActorIdToProto(Ctx->Edge, event->Record.MutableActor()); + ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor()); auto* body = event->Record.MutableData(); auto* operation = body->MutableOperations()->Add(); operation->SetPartitionId(partition); diff --git a/ydb/core/persqueue/ut/pqtablet_ut.cpp b/ydb/core/persqueue/ut/pqtablet_ut.cpp index 1013ccfd26f..0bbb72e1654 100644 --- a/ydb/core/persqueue/ut/pqtablet_ut.cpp +++ b/ydb/core/persqueue/ut/pqtablet_ut.cpp @@ -38,7 +38,6 @@ struct TTxOperation { struct TConfigParams { TMaybe<NKikimrPQ::TPQTabletConfig> Tablet; TMaybe<NKikimrPQ::TBootstrapConfig> Bootstrap; - ui64 SchemeShardId = 0; }; struct TProposeTransactionParams { @@ -216,7 +215,7 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); THashSet<ui32> partitions; - ActorIdToProto(Ctx->Edge, event->Record.MutableActor()); + ActorIdToProto(Ctx->Edge, event->Record.MutableSourceActor()); event->Record.SetTxId(params.TxId); if (params.Configs) { @@ -224,9 +223,6 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa // TxBody.Config // auto* body = event->Record.MutableConfig(); - if (params.Configs->SchemeShardId) { - body->SetSchemeShardId(params.Configs->SchemeShardId); - } if (params.Configs->Tablet.Defined()) { *body->MutableTabletConfig() = *params.Configs->Tablet; } @@ -750,7 +746,6 @@ Y_UNIT_TEST_F(DropTablet_Before_Write, TPQTabletFixture) Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) { - NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222); PQTabletPrepare({.partitions=2}, {}, *Ctx); const ui64 txId = 67890; @@ -765,7 +760,6 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) .Configs=NHelpers::TConfigParams{ .Tablet=tabletConfig, .Bootstrap=NHelpers::MakeBootstrapConfig(), - .SchemeShardId = 22222 }}); WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED}); @@ -775,16 +769,12 @@ Y_UNIT_TEST_F(UpdateConfig_1, TPQTabletFixture) WaitPlanStepAck({.Step=100, .TxIds={txId}}); WaitPlanStepAccepted({.Step=100}); - WaitReadSet(*schemeshard, {.Step=100, .TxId=txId, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); - schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId, .Source=Ctx->TabletId}); - WaitProposeTransactionResponse({.TxId=txId, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); } Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) { - NHelpers::TPQTabletMock* schemeshard = CreatePQTabletMock(22222); PQTabletPrepare({.partitions=2}, {}, *Ctx); const ui64 txId_2 = 67891; @@ -801,7 +791,6 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) .Configs=NHelpers::TConfigParams{ .Tablet=tabletConfig, .Bootstrap=NHelpers::MakeBootstrapConfig(), - .SchemeShardId = 22222 }}); SendProposeTransactionRequest({.TxId=txId_3, .TxOps={ @@ -819,9 +808,6 @@ Y_UNIT_TEST_F(UpdateConfig_2, TPQTabletFixture) WaitPlanStepAck({.Step=100, .TxIds={txId_2, txId_3}}); WaitPlanStepAccepted({.Step=100}); - WaitReadSet(*schemeshard, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId, .Target=22222, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Producer=Ctx->TabletId}); - schemeshard->SendReadSetAck(*Ctx->Runtime, {.Step=100, .TxId=txId_2, .Source=Ctx->TabletId}); - WaitProposeTransactionResponse({.TxId=txId_2, .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE}); WaitProposeTransactionResponse({.TxId=txId_3, diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index dba8eea5d9c..5b6843dfda8 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -520,5 +520,5 @@ enum ETxTypes { TXTYPE_CDC_STREAM_SCAN_PROGRESS = 82 [(TxTypeOpts) = {Name: "TxCdcStreamScanProgress"}]; TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}]; - TXTYPE_READSETDATA = 84 [(TxTypeOpts) = {Name: "TxReadSetData"}]; + TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}]; } diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index b9b438290f3..66745b10d04 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -820,16 +820,12 @@ message TDataTransaction { } message TConfigTransaction { - optional uint64 SchemeShardId = 1; optional TPQTabletConfig TabletConfig = 2; optional TBootstrapConfig BootstrapConfig = 3; } message TEvProposeTransaction { - oneof Source { - NActorsProto.TActorId Actor = 1; - uint64 Tablet = 5; - } + optional NActorsProto.TActorId SourceActor = 1; optional uint64 TxId = 2; oneof TxBody { TDataTransaction Data = 3; @@ -920,15 +916,11 @@ message TTransaction { // optional TPQTabletConfig TabletConfig = 12; optional TBootstrapConfig BootstrapConfig = 13; - optional uint64 SchemeShardId = 16; // // получатель результата // - oneof Source { - NActorsProto.TActorId Actor = 14; - uint64 Tablet = 15; - } + optional NActorsProto.TActorId SourceActor = 14; }; message TTabletTxInfo { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 5797ec4b014..3ade64ef64a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -383,28 +383,6 @@ void OutOfScopeEventHandler<TEvDataShard::TEvSchemaChanged>(const TEvDataShard:: context.OnComplete.Send(ackTo, event.Release()); } -// -// PQ tablet sends TEvTxProcessing::TEvReadSet and is waiting for a response. If there is no response, the -// transaction for changing the config will not be completed. As a result, the execution of other transactions -// in the queue will stop on the tablet side of the PQ -// -template <> -void OutOfScopeEventHandler<TEvTxProcessing::TEvReadSet>(const TEvTxProcessing::TEvReadSet::TPtr& ev, TOperationContext& context) -{ - const auto txId = ev->Get()->Record.GetTxId(); - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "TTxOperationReply<" << ev->GetTypeName() << "> execute" - << ", at schemeshard: " << context.SS->TabletID() - << ", send out-of-scope reply, for txId " << txId - ); - - const TActorId ackTo = ev->Sender; - const TTabletId ssId = context.SS->SelfTabletId(); - - auto ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), ui64(ssId)); - context.OnComplete.Send(ackTo, ack.release()); -} - template <class TEvType> struct TTxTypeFrom; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 9b266eb70f0..1d6b4428e17 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -595,9 +595,9 @@ bool CollectProposeTransactionResults(const TOperationId& operationId, return CollectProposeTxResults(ev, operationId, context, prepared, toString); } -bool CollectSchemaChanged(const TOperationId& operationId, - const TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, - TOperationContext& context) +bool CollectPQConfigChanged(const TOperationId& operationId, + const TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, + TOperationContext& context) { Y_VERIFY(context.SS->FindTx(operationId)); TTxState& txState = *context.SS->FindTx(operationId); @@ -615,7 +615,7 @@ bool CollectSchemaChanged(const TOperationId& operationId, txState.ShardsInProgress.erase(shardIdx); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "CollectSchemaChanged accept TEvPersQueue::TEvProposeTransactionResult" + "CollectPQConfigChanged accept TEvPersQueue::TEvProposeTransactionResult" << ", operationId: " << operationId << ", shardIdx: " << shardIdx << ", shard: " << shardId @@ -628,6 +628,57 @@ bool CollectSchemaChanged(const TOperationId& operationId, return txState.ShardsInProgress.empty(); } +bool CollectPQConfigChanged(const TOperationId& operationId, + const TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, + TOperationContext& context) +{ + Y_VERIFY(context.SS->FindTx(operationId)); + TTxState& txState = *context.SS->FindTx(operationId); + + // + // The PQ tablet can perform a transaction and send a TEvProposeTransactionResult(COMPLETE) response. + // The SchemeShard tablet can restart at this point. After restarting at the TPropose step, it will + // send the TEvProposeTransactionAttach message to the PQ tablets. If the NODATA status is specified in + // the response TEvProposeTransactionAttachResult, then the PQ tablet has already completed the transaction. + // Otherwise, she continues to execute the transaction + // + + const auto& evRecord = ev->Get()->Record; + if (evRecord.GetStatus() != NKikimrProto::NODATA) { + // + // If the PQ tablet returned something other than NODATA, then it continues to execute the transaction + // + return txState.ShardsInProgress.empty(); + } + + // + // Otherwise, she has already completed the transaction and has forgotten about it. Then we can + // remove PQ tablet from the list of shards + // + + const auto ssId = context.SS->SelfTabletId(); + const TTabletId shardId(evRecord.GetTabletId()); + + const auto shardIdx = context.SS->MustGetShardIdx(shardId); + Y_VERIFY(context.SS->ShardInfos.contains(shardIdx)); + + Y_VERIFY(txState.State == TTxState::Propose); + + txState.ShardsInProgress.erase(shardIdx); + + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "CollectPQConfigChanged accept TEvPersQueue::TEvProposeTransactionAttachResult" + << ", operationId: " << operationId + << ", shardIdx: " << shardIdx + << ", shard: " << shardId + << ", left await: " << txState.ShardsInProgress.size() + << ", txState.State: " << TTxState::StateName(txState.State) + << ", txState.ReadyForNotifications: " << txState.ReadyForNotifications + << ", at schemeshard: " << ssId); + + return txState.ShardsInProgress.empty(); +} + bool TConfigureParts::HandleReply(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) { const TTabletId ssId = context.SS->SelfTabletId(); @@ -649,13 +700,11 @@ THolder<TEvPersQueue::TEvProposeTransaction> TConfigureParts::MakeEvProposeTrans const TString& databaseId, const TString& databasePath, TTxState::ETxType txType, - TTabletId ssId, const TOperationContext& context) { auto event = MakeHolder<TEvPersQueue::TEvProposeTransaction>(); event->Record.SetTxId(ui64(txId)); - event->Record.SetTablet(ui64(ssId)); - event->Record.MutableConfig()->SetSchemeShardId(ui64(ssId)); + ActorIdToProto(context.SS->SelfId(), event->Record.MutableSourceActor()); MakePQTabletConfig(*event->Record.MutableConfig()->MutableTabletConfig(), pqGroup, @@ -723,29 +772,29 @@ bool TPropose::HandleReply(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, << ", at schemeshard: " << ssId << " message# " << evRecord.ShortDebugString()); - const bool collected = CollectSchemaChanged(OperationId, ev, context); + const bool collected = CollectPQConfigChanged(OperationId, ev, context); LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " HandleReply TEvProposeTransactionResult" - << " CollectSchemaChanged: " << (collected ? "true" : "false")); + << " CollectPQConfigChanged: " << (collected ? "true" : "false")); return TryPersistState(context); } - -bool TPropose::HandleReply(TEvTxProcessing::TEvReadSet::TPtr& ev, TOperationContext& context) + +bool TPropose::HandleReply(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, TOperationContext& context) { - const ui64 tabletId = ev->Get()->Record.GetTabletProducer(); - if (auto p = ReadSetAcks.find(tabletId); p != ReadSetAcks.end()) { - TReadSetAck& ack = p->second; + const auto ssId = context.SS->SelfTabletId(); + const auto& evRecord = ev->Get()->Record; - if (!ack.Event) { - ++ReadSetCount; - } + LOG_INFO_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvProposeTransactionAttachResult" + << " triggers early" + << ", at schemeshard: " << ssId + << " message# " << evRecord.ShortDebugString()); - ack.Event = - MakeHolder<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), - ui64(context.SS->SelfTabletId())); - ack.Receiver = ev->Sender; - } + const bool collected = CollectPQConfigChanged(OperationId, ev, context); + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " HandleReply TEvProposeTransactionAttachResult" + << " CollectPQConfigChanged: " << (collected ? "true" : "false")); return TryPersistState(context); } @@ -754,9 +803,6 @@ void TPropose::PrepareShards(TTxState& txState, TSet<TTabletId>& shardSet, TOper { txState.UpdateShardsInProgress(); - ReadSetAcks.clear(); - ReadSetCount = 0; - for (const auto& shard : txState.Shards) { const TShardIdx idx = shard.Idx; // @@ -769,68 +815,86 @@ void TPropose::PrepareShards(TTxState& txState, TSet<TTabletId>& shardSet, TOper shardSet.insert(tablet); - auto& ack = ReadSetAcks[ui64(tablet)]; - ack.Event = nullptr; - ack.Receiver = TActorId(); + // + // By this point, the SchemeShard tablet could restart and the actor ID changed. Therefore, we send + // the TEvProposeTransactionAttach message to the PQ tablets so that they recognize the new recipient + // + SendEvProposeTransactionAttach(idx, tablet, context); } else { txState.ShardsInProgress.erase(idx); } } } -bool TPropose::TryPersistState(TOperationContext& context) +void TPropose::SendEvProposeTransactionAttach(TShardIdx shard, TTabletId tablet, + TOperationContext& context) { - if (ReadSetCount < ReadSetAcks.size()) { - return false; - } - - TTxState* txState = context.SS->FindTx(OperationId); - Y_VERIFY(txState); + auto event = + MakeHolder<TEvPersQueue::TEvProposeTransactionAttach>(ui64(tablet), + ui64(OperationId.GetTxId())); + context.OnComplete.BindMsgToPipe(OperationId, tablet, shard, event.Release()); +} - if (!txState->ShardsInProgress.empty()) { +bool TPropose::CanPersistState(const TTxState& txState, + TOperationContext& context) +{ + if (!txState.ShardsInProgress.empty()) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " can't persist state: " << + "ShardsInProgress is not empty, remain: " << txState.ShardsInProgress.size()); return false; } - const TPathId pathId = txState->TargetPathId; - const TPathElement::TPtr path = context.SS->PathsById.at(pathId); + PathId = txState.TargetPathId; + Path = context.SS->PathsById.at(PathId); - if (path->StepCreated == InvalidStepId) { + if (Path->StepCreated == InvalidStepId) { + LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + DebugHint() << " can't persist state: " << + "StepCreated is invalid"); return false; } + return true; +} + +void TPropose::PersistState(const TTxState& txState, + TOperationContext& context) const +{ NIceDb::TNiceDb db(context.GetDB()); - if (txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAllocatePQ) { - auto parentDir = context.SS->PathsById.at(path->ParentPathId); + if (txState.TxType == TTxState::TxCreatePQGroup || txState.TxType == TTxState::TxAllocatePQ) { + auto parentDir = context.SS->PathsById.at(Path->ParentPathId); ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); } - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + context.SS->ClearDescribePathCaches(Path); + context.OnComplete.PublishToSchemeBoard(OperationId, PathId); - TTopicInfo::TPtr pqGroup = context.SS->Topics[pathId]; + TTopicInfo::TPtr pqGroup = context.SS->Topics[PathId]; pqGroup->FinishAlter(); - context.SS->PersistPersQueueGroup(db, pathId, pqGroup); - context.SS->PersistRemovePersQueueGroupAlter(db, pathId); + context.SS->PersistPersQueueGroup(db, PathId, pqGroup); + context.SS->PersistRemovePersQueueGroupAlter(db, PathId); context.SS->ChangeTxState(db, OperationId, TTxState::Done); - - SendEvReadSetAck(context); - - return true; } -void TPropose::SendEvReadSetAck(TOperationContext& context) +bool TPropose::TryPersistState(TOperationContext& context) { - for (auto& [_, ack] : ReadSetAcks) { - Y_VERIFY(ack.Event); + TTxState* txState = context.SS->FindTx(OperationId); + Y_VERIFY(txState); - context.Ctx.Send(ack.Receiver, ack.Event.Release()); + if (!CanPersistState(*txState, context)) { + return false; } + + PersistState(*txState, context); + + return true; } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index b9d2b83e5a3..c0addf8c929 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -686,7 +686,6 @@ public: databaseId, databasePath, txState->TxType, - ssId, context); } else { event = MakeEvUpdateConfig(OperationId.GetTxId(), @@ -851,7 +850,6 @@ private: const TString& databaseId, const TString& databasePath, TTxState::ETxType txType, - TTabletId ssId, const TOperationContext& context); static THolder<TEvPersQueue::TEvUpdateConfig> MakeEvUpdateConfig(TTxId txId, @@ -885,7 +883,7 @@ public: } bool HandleReply(TEvPersQueue::TEvProposeTransactionResult::TPtr& ev, TOperationContext& context) override; - bool HandleReply(TEvTxProcessing::TEvReadSet::TPtr& ev, TOperationContext& context) override; + bool HandleReply(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, TOperationContext& context) override; bool HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) override { TStepId step = TStepId(ev->Get()->StepId); @@ -945,18 +943,18 @@ public: } private: + bool CanPersistState(const TTxState& txState, + TOperationContext& context); + void PersistState(const TTxState& txState, + TOperationContext& context) const; bool TryPersistState(TOperationContext& context); - void SendEvReadSetAck(TOperationContext& context); + void SendEvProposeTransactionAttach(TShardIdx shard, TTabletId tablet, + TOperationContext& context); void PrepareShards(TTxState& txState, TSet<TTabletId>& shardSet, TOperationContext& context); - struct TReadSetAck { - THolder<TEvTxProcessing::TEvReadSetAck> Event; - TActorId Receiver; - }; - - THashMap<ui64, TReadSetAck> ReadSetAcks; - size_t ReadSetCount = 0; + TPathId PathId; + TPathElement::TPtr Path; }; } // NPQState diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_part.h b/ydb/core/tx/schemeshard/schemeshard__operation_part.h index 432d3e68643..441d19b6b60 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_part.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_part.h @@ -75,7 +75,7 @@ action(TEvPrivate::TEvPrivate::TEvCompletePublication, NSchemeShard::TXTYPE_NOTIFY_OPERATION_COMPLETE_PUBLICATION) \ action(TEvPrivate::TEvPrivate::TEvCompleteBarrier, NSchemeShard::TXTYPE_NOTIFY_OPERATION_COMPLETE_BARRIER) \ \ - action(TEvTxProcessing::TEvReadSet, NSchemeShard::TXTYPE_READSETDATA) + action(TEvPersQueue::TEvProposeTransactionAttachResult, NSchemeShard::TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT) namespace NKikimr { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index cacaa80aa3e..0ca6dabc396 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4381,7 +4381,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvSchemeShard::TEvLogin, Handle); - HFuncTraced(TEvTxProcessing::TEvReadSet, Handle); + HFuncTraced(TEvPersQueue::TEvProposeTransactionAttachResult, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { @@ -4949,36 +4949,25 @@ void TSchemeShard::Handle(TEvPrivate::TEvProgressOperation::TPtr &ev, const TAct Execute(CreateTxOperationProgress(TOperationId(txId, ev->Get()->TxPartId)), ctx); } -void TSchemeShard::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx) +void TSchemeShard::Handle(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, const TActorContext& ctx) { - auto sendReadSetAck = [&]() { - auto ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID()); - ctx.Send(ev->Sender, ack.release()); - }; - const auto txId = TTxId(ev->Get()->Record.GetTxId()); if (!Operations.contains(txId)) { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Got TEvTxProcessing::TEvReadSet" - << " for unknown txId " << txId - << " message " << ev->Get()->Record.ShortDebugString()); - - sendReadSetAck(); - + "Got TEvPersQueue::TEvProposeTransactionAttachResult" + << " for unknown txId: " << txId + << " message: " << ev->Get()->Record.ShortDebugString()); return; } - const TTabletId tabletId(ev->Get()->Record.GetTabletSource()); - const TSubTxId partId = Operations.at(txId)->FindRelatedPartByTabletId(tabletId, ctx); + auto tabletId = TTabletId(ev->Get()->Record.GetTabletId()); + TSubTxId partId = Operations.at(txId)->FindRelatedPartByTabletId(tabletId, ctx); if (partId == InvalidSubTxId) { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Got TEvProposeTransactionResult but partId is unknown" + "Got TEvPersQueue::TEvProposeTransactionAttachResult but partId is unknown" << ", for txId: " << txId << ", tabletId: " << tabletId << ", at schemeshard: " << TabletID()); - - sendReadSetAck(); - return; } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 70efa73fc33..a96d2218087 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -930,7 +930,7 @@ public: void Handle(TEvPrivate::TEvProgressOperation::TPtr &ev, const TActorContext &ctx); - void Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvProposeTransactionAttachResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx); |