diff options
author | ivanmorozov333 <[email protected]> | 2025-01-10 21:15:01 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-01-10 21:15:01 +0300 |
commit | af411bb10f1133d6e7f4c6324a89dde2f745d675 (patch) | |
tree | 11396a560428378a2f56cbcd28e98088ece2210e | |
parent | 6878f9bb53de1b292205d5d7917e048c34ebc517 (diff) |
strict writing data validation (#13190)
4 files changed, 27 insertions, 14 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp index 6c0803d33a7..c6f94262b63 100644 --- a/ydb/core/kqp/common/kqp_tx_manager.cpp +++ b/ydb/core/kqp/common/kqp_tx_manager.cpp @@ -278,17 +278,17 @@ public: for (auto& [shardId, shardInfo] : ShardsInfo) { if ((shardInfo.Flags & EAction::WRITE)) { ReceivingShards.insert(shardId); + if (shardInfo.IsOlap) { + receivingColumnShardsSet.insert(shardId); + } if (IsVolatile()) { SendingShards.insert(shardId); } - if (shardInfo.IsOlap) { - sendingColumnShardsSet.insert(shardId); - } } if (!shardInfo.Locks.empty()) { SendingShards.insert(shardId); if (shardInfo.IsOlap) { - receivingColumnShardsSet.insert(shardId); + sendingColumnShardsSet.insert(shardId); } } @@ -325,6 +325,7 @@ public: auto arbiterIterator = std::begin(shards); std::advance(arbiterIterator, index); ArbiterColumnShard = *arbiterIterator; + ReceivingShards.insert(*ArbiterColumnShard); } ShardsToWaitPrepare = ShardsIds; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index bcb66aa4975..cbd15448569 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2561,6 +2561,7 @@ private: auto arbiterIterator = std::begin(shards); std::advance(arbiterIterator, index); columnShardArbiter = *arbiterIterator; + receivingShardsSet.insert(*columnShardArbiter); } } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index a2b25407099..7cd1b8bf0d2 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -315,18 +315,28 @@ public: LockId = lock.GetLockId(); SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty(); - if (!singleShardTx) { + if (SendingShards.empty() != ReceivingShards.empty()) { + return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)"); + } + if (ReceivingShards.size() && SendingShards.size()) { if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is absent in sending and receiving lists"); + return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists"); } - if (locks.HasArbiterColumnShard()) { - ArbiterColumnShard = locks.GetArbiterColumnShard(); - } else { - AFL_VERIFY(!ReceivingShards.empty()); - ArbiterColumnShard = *ReceivingShards.begin(); + if (!locks.HasArbiterColumnShard()) { + return TConclusionStatus::Fail("no arbiter info in request"); + } + ArbiterColumnShard = locks.GetArbiterColumnShard(); + + if (IsPrimary() && !ReceivingShards.contains(ArbiterColumnShard)) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)( + "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards)); + return TConclusionStatus::Fail("arbiter is absent in receiving lists"); + } + if (!IsPrimary() && (!ReceivingShards.contains(ArbiterColumnShard) || !SendingShards.contains(ArbiterColumnShard))) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)( + "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards)); + return TConclusionStatus::Fail("arbiter is absent in sending or receiving lists"); } - AFL_VERIFY(ArbiterColumnShard); } Generation = lock.GetGeneration(); diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 8fc19f3c858..6ec0e70811b 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -159,7 +159,8 @@ private: }; virtual bool IsTxBroken() const override { - return TxBroken.value_or(false); + AFL_VERIFY(TxBroken); + return *TxBroken; } void InitializeRequests(TColumnShard& owner) { |