summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <[email protected]>2025-01-10 21:15:01 +0300
committerGitHub <[email protected]>2025-01-10 21:15:01 +0300
commitaf411bb10f1133d6e7f4c6324a89dde2f745d675 (patch)
tree11396a560428378a2f56cbcd28e98088ece2210e
parent6878f9bb53de1b292205d5d7917e048c34ebc517 (diff)
strict writing data validation (#13190)
-rw-r--r--ydb/core/kqp/common/kqp_tx_manager.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp28
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h3
4 files changed, 27 insertions, 14 deletions
diff --git a/ydb/core/kqp/common/kqp_tx_manager.cpp b/ydb/core/kqp/common/kqp_tx_manager.cpp
index 6c0803d33a7..c6f94262b63 100644
--- a/ydb/core/kqp/common/kqp_tx_manager.cpp
+++ b/ydb/core/kqp/common/kqp_tx_manager.cpp
@@ -278,17 +278,17 @@ public:
for (auto& [shardId, shardInfo] : ShardsInfo) {
if ((shardInfo.Flags & EAction::WRITE)) {
ReceivingShards.insert(shardId);
+ if (shardInfo.IsOlap) {
+ receivingColumnShardsSet.insert(shardId);
+ }
if (IsVolatile()) {
SendingShards.insert(shardId);
}
- if (shardInfo.IsOlap) {
- sendingColumnShardsSet.insert(shardId);
- }
}
if (!shardInfo.Locks.empty()) {
SendingShards.insert(shardId);
if (shardInfo.IsOlap) {
- receivingColumnShardsSet.insert(shardId);
+ sendingColumnShardsSet.insert(shardId);
}
}
@@ -325,6 +325,7 @@ public:
auto arbiterIterator = std::begin(shards);
std::advance(arbiterIterator, index);
ArbiterColumnShard = *arbiterIterator;
+ ReceivingShards.insert(*ArbiterColumnShard);
}
ShardsToWaitPrepare = ShardsIds;
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index bcb66aa4975..cbd15448569 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2561,6 +2561,7 @@ private:
auto arbiterIterator = std::begin(shards);
std::advance(arbiterIterator, index);
columnShardArbiter = *arbiterIterator;
+ receivingShardsSet.insert(*columnShardArbiter);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index a2b25407099..7cd1b8bf0d2 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -315,18 +315,28 @@ public:
LockId = lock.GetLockId();
SendingShards = std::set<ui64>(locks.GetSendingShards().begin(), locks.GetSendingShards().end());
ReceivingShards = std::set<ui64>(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end());
- const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty();
- if (!singleShardTx) {
+ if (SendingShards.empty() != ReceivingShards.empty()) {
+ return TConclusionStatus::Fail("incorrect synchronization data (send/receiving lists)");
+ }
+ if (ReceivingShards.size() && SendingShards.size()) {
if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) {
- return TConclusionStatus::Fail("shard is absent in sending and receiving lists");
+ return TConclusionStatus::Fail("current tablet_id is absent in sending and receiving lists");
}
- if (locks.HasArbiterColumnShard()) {
- ArbiterColumnShard = locks.GetArbiterColumnShard();
- } else {
- AFL_VERIFY(!ReceivingShards.empty());
- ArbiterColumnShard = *ReceivingShards.begin();
+ if (!locks.HasArbiterColumnShard()) {
+ return TConclusionStatus::Fail("no arbiter info in request");
+ }
+ ArbiterColumnShard = locks.GetArbiterColumnShard();
+
+ if (IsPrimary() && !ReceivingShards.contains(ArbiterColumnShard)) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)(
+ "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards));
+ return TConclusionStatus::Fail("arbiter is absent in receiving lists");
+ }
+ if (!IsPrimary() && (!ReceivingShards.contains(ArbiterColumnShard) || !SendingShards.contains(ArbiterColumnShard))) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "incorrect arbiter")("arbiter_id", ArbiterColumnShard)(
+ "receiving", JoinSeq(", ", ReceivingShards))("sending", JoinSeq(", ", SendingShards));
+ return TConclusionStatus::Fail("arbiter is absent in sending or receiving lists");
}
- AFL_VERIFY(ArbiterColumnShard);
}
Generation = lock.GetGeneration();
diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
index 8fc19f3c858..6ec0e70811b 100644
--- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
+++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
@@ -159,7 +159,8 @@ private:
};
virtual bool IsTxBroken() const override {
- return TxBroken.value_or(false);
+ AFL_VERIFY(TxBroken);
+ return *TxBroken;
}
void InitializeRequests(TColumnShard& owner) {