diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2024-12-24 15:46:52 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-24 15:46:52 +0300 |
commit | 5034cc844a2c120e29c92e9f26bd5e73971ba265 (patch) | |
tree | 535ca108d72014b41ab890050f00983501b3626c | |
parent | ea8ccc45dc32a339f4bb906b7ef02a72c3bbb93b (diff) | |
download | ydb-5034cc844a2c120e29c92e9f26bd5e73971ba265.tar.gz |
Fixed errors with transactions in the PQ tablet (#12905)
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 61 |
3 files changed, 45 insertions, 22 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 005e3a3245..6d23f435d9 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -821,6 +821,7 @@ struct TEvPQ { ui64 TxId; TVector<NKikimrPQ::TPartitionOperation> Operations; TActorId SupportivePartitionActor; + bool ForcePredicateFalse = false; }; struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 17ac733b28..656d4ed8d9 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2154,6 +2154,11 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t, TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicate) { + if (tx.ForcePredicateFalse) { + predicate = false; + return EProcessResult::Continue; + } + THashSet<TString> consumers; bool ok = true; for (auto& operation : tx.Operations) { diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 8b4580192b..6ba9ccf022 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -3910,10 +3910,22 @@ TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, TDistributedTransaction& tx) { + auto OriginalPartitionExists = [this](ui32 partitionId) { + return Partitions.contains(TPartitionId(partitionId)); + }; + + // if the predicate is violated, the transaction will end with the ABORTED code + bool forcePredicateFalse = false; THashMap<ui32, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events; for (auto& operation : tx.Operations) { ui32 originalPartitionId = operation.GetPartitionId(); + + if (!OriginalPartitionExists(originalPartitionId)) { + forcePredicateFalse = true; + continue; + } + auto& event = events[originalPartitionId]; if (!event) { event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId); @@ -3928,29 +3940,43 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx, if (tx.WriteId.Defined()) { const TWriteId& writeId = *tx.WriteId; - Y_ABORT_UNLESS(TxWrites.contains(writeId), - "PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}", - TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId); - const TTxWriteInfo& writeInfo = TxWrites.at(writeId); + if (TxWrites.contains(writeId)) { + const TTxWriteInfo& writeInfo = TxWrites.at(writeId); + + for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) { + if (!OriginalPartitionExists(originalPartitionId)) { + PQ_LOG_W("Unknown partition " << originalPartitionId << " for TxId " << tx.TxId); + forcePredicateFalse = true; + continue; + } - for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) { - Y_ABORT_UNLESS(Partitions.contains(partitionId)); - const TPartitionInfo& partition = Partitions.at(partitionId); + auto& event = events[originalPartitionId]; + if (!event) { + event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId); + } - auto& event = events[originalPartitionId]; - if (!event) { - event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId); - } + if (!Partitions.contains(partitionId)) { + PQ_LOG_W("Unknown partition " << partitionId << " for TxId " << tx.TxId); + forcePredicateFalse = true; + continue; + } + + const TPartitionInfo& partition = Partitions.at(partitionId); - event->SupportivePartitionActor = partition.Actor; + event->SupportivePartitionActor = partition.Actor; + } + } else { + PQ_LOG_W("Unknown WriteId " << writeId << " for TxId " << tx.TxId); + forcePredicateFalse = true; } } for (auto& [originalPartitionId, event] : events) { TPartitionId partitionId(originalPartitionId); - Y_ABORT_UNLESS(Partitions.contains(partitionId)); const TPartitionInfo& partition = Partitions.at(partitionId); + event->ForcePredicateFalse = forcePredicateFalse; + ctx.Send(partition.Actor, event.release()); } @@ -4297,15 +4323,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS); - // - // the number of TEvReadSetAck sent should not be greater than the number of senders - // from TEvProposeTransaction - // - Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.PredicatesReceived.size(), - "PQ %" PRIu64 ", TxId %" PRIu64 ", ReadSetAcks.size %" PRISZT ", PredicatesReceived.size %" PRISZT, - TabletID(), tx.TxId, - tx.ReadSetAcks.size(), tx.PredicatesReceived.size()); - SendEvReadSetToReceivers(ctx, tx); if (tx.TxId != TxsOrder[tx.State].front()) { |