aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2024-12-24 15:46:52 +0300
committerGitHub <noreply@github.com>2024-12-24 15:46:52 +0300
commit5034cc844a2c120e29c92e9f26bd5e73971ba265 (patch)
tree535ca108d72014b41ab890050f00983501b3626c
parentea8ccc45dc32a339f4bb906b7ef02a72c3bbb93b (diff)
downloadydb-5034cc844a2c120e29c92e9f26bd5e73971ba265.tar.gz
Fixed errors with transactions in the PQ tablet (#12905)
-rw-r--r--ydb/core/persqueue/events/internal.h1
-rw-r--r--ydb/core/persqueue/partition.cpp5
-rw-r--r--ydb/core/persqueue/pq_impl.cpp61
3 files changed, 45 insertions, 22 deletions
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index 005e3a3245..6d23f435d9 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -821,6 +821,7 @@ struct TEvPQ {
ui64 TxId;
TVector<NKikimrPQ::TPartitionOperation> Operations;
TActorId SupportivePartitionActor;
+ bool ForcePredicateFalse = false;
};
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 17ac733b28..656d4ed8d9 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2154,6 +2154,11 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
TPartition::EProcessResult TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx, TMaybe<bool>& predicate)
{
+ if (tx.ForcePredicateFalse) {
+ predicate = false;
+ return EProcessResult::Continue;
+ }
+
THashSet<TString> consumers;
bool ok = true;
for (auto& operation : tx.Operations) {
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 8b4580192b..6ba9ccf022 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -3910,10 +3910,22 @@ TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti
void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
TDistributedTransaction& tx)
{
+ auto OriginalPartitionExists = [this](ui32 partitionId) {
+ return Partitions.contains(TPartitionId(partitionId));
+ };
+
+ // if the predicate is violated, the transaction will end with the ABORTED code
+ bool forcePredicateFalse = false;
THashMap<ui32, std::unique_ptr<TEvPQ::TEvTxCalcPredicate>> events;
for (auto& operation : tx.Operations) {
ui32 originalPartitionId = operation.GetPartitionId();
+
+ if (!OriginalPartitionExists(originalPartitionId)) {
+ forcePredicateFalse = true;
+ continue;
+ }
+
auto& event = events[originalPartitionId];
if (!event) {
event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
@@ -3928,29 +3940,43 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
if (tx.WriteId.Defined()) {
const TWriteId& writeId = *tx.WriteId;
- Y_ABORT_UNLESS(TxWrites.contains(writeId),
- "PQ %" PRIu64 ", TxId %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
- TabletID(), tx.TxId, writeId.NodeId, writeId.KeyId);
- const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
+ if (TxWrites.contains(writeId)) {
+ const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
+
+ for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) {
+ if (!OriginalPartitionExists(originalPartitionId)) {
+ PQ_LOG_W("Unknown partition " << originalPartitionId << " for TxId " << tx.TxId);
+ forcePredicateFalse = true;
+ continue;
+ }
- for (auto& [originalPartitionId, partitionId] : writeInfo.Partitions) {
- Y_ABORT_UNLESS(Partitions.contains(partitionId));
- const TPartitionInfo& partition = Partitions.at(partitionId);
+ auto& event = events[originalPartitionId];
+ if (!event) {
+ event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
+ }
- auto& event = events[originalPartitionId];
- if (!event) {
- event = std::make_unique<TEvPQ::TEvTxCalcPredicate>(tx.Step, tx.TxId);
- }
+ if (!Partitions.contains(partitionId)) {
+ PQ_LOG_W("Unknown partition " << partitionId << " for TxId " << tx.TxId);
+ forcePredicateFalse = true;
+ continue;
+ }
+
+ const TPartitionInfo& partition = Partitions.at(partitionId);
- event->SupportivePartitionActor = partition.Actor;
+ event->SupportivePartitionActor = partition.Actor;
+ }
+ } else {
+ PQ_LOG_W("Unknown WriteId " << writeId << " for TxId " << tx.TxId);
+ forcePredicateFalse = true;
}
}
for (auto& [originalPartitionId, event] : events) {
TPartitionId partitionId(originalPartitionId);
- Y_ABORT_UNLESS(Partitions.contains(partitionId));
const TPartitionInfo& partition = Partitions.at(partitionId);
+ event->ForcePredicateFalse = forcePredicateFalse;
+
ctx.Send(partition.Actor, event.release());
}
@@ -4297,15 +4323,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS);
- //
- // the number of TEvReadSetAck sent should not be greater than the number of senders
- // from TEvProposeTransaction
- //
- Y_ABORT_UNLESS(tx.ReadSetAcks.size() <= tx.PredicatesReceived.size(),
- "PQ %" PRIu64 ", TxId %" PRIu64 ", ReadSetAcks.size %" PRISZT ", PredicatesReceived.size %" PRISZT,
- TabletID(), tx.TxId,
- tx.ReadSetAcks.size(), tx.PredicatesReceived.size());
-
SendEvReadSetToReceivers(ctx, tx);
if (tx.TxId != TxsOrder[tx.State].front()) {