aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <yulia@ydb.tech>2025-03-04 12:07:39 +0100
committerGitHub <noreply@github.com>2025-03-04 12:07:39 +0100
commit9eea4e372ad51d5e90a0446e64a1751d2532214f (patch)
treececc9443feed25f67837cd667e48e0de999afc2d
parent18f45b9e2316ffe02a1561d66e811d12f5abd8cc (diff)
downloadydb-9eea4e372ad51d5e90a0446e64a1751d2532214f.tar.gz
feat(data_integrity_trails): add logging of acquired and broken locks (#13164)
-rw-r--r--ydb/core/kqp/common/kqp_data_integrity_trails.h97
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp14
-rw-r--r--ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp156
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_integrity_trails.h26
-rw-r--r--ydb/core/tx/datashard/datashard_ut_locks.cpp2
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/execute_write_unit.cpp7
-rw-r--r--ydb/core/tx/locks/locks.cpp12
-rw-r--r--ydb/core/tx/locks/locks.h2
11 files changed, 282 insertions, 47 deletions
diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h
index 71198204a5..28566a975d 100644
--- a/ydb/core/kqp/common/kqp_data_integrity_trails.h
+++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h
@@ -6,6 +6,8 @@
#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
+#include <ydb/core/tx/data_events/events.h>
+#include <ydb/core/tx/datashard/datashard.h>
namespace NKikimr {
namespace NDataIntegrity {
@@ -97,23 +99,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction
}
// DataExecuter
-inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
- auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
+inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
+ auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) {
TStringStream ss;
LogKeyValue("Component", "Executer", ss);
+ LogKeyValue("Type", "Request", ss);
LogKeyValue("TraceId", traceId, ss);
LogKeyValue("PhyTxId", ToString(txId), ss);
+ LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss);
if (shardId) {
LogKeyValue("ShardId", ToString(*shardId), ss);
}
- LogKeyValue("Type", type, ss, /*last*/ true);
+ LogKeyValue("TxType", type, ss, /*last*/ true);
+
+ return ss.Str();
+ };
+
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId));
+}
+
+inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) {
+ auto log = [](const auto& state, const auto& traceId, const auto& ev) {
+ const auto& record = ev->Get()->Record;
+
+ TStringStream ss;
+ LogKeyValue("Component", "Executer", ss);
+ LogKeyValue("Type", "Response", ss);
+ LogKeyValue("State", state, ss);
+ LogKeyValue("TraceId", traceId, ss);
+ LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
+ LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);
+
+ TStringBuilder locksDebugStr;
+ locksDebugStr << "[";
+ for (const auto& lock : record.GetTxLocks()) {
+ locksDebugStr << lock.ShortDebugString() << " ";
+ }
+ locksDebugStr << "]";
+
+ LogKeyValue("Locks", locksDebugStr, ss);
+ LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss);
+
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetIssues(), issues);
+ LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true);
+
+ return ss.Str();
+ };
+
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
+}
+
+inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) {
+ auto log = [](const auto& state, const auto& traceId, const auto& ev) {
+ const auto& record = ev->Get()->Record;
+
+ TStringStream ss;
+ LogKeyValue("Component", "Executer", ss);
+ LogKeyValue("Type", "Response", ss);
+ LogKeyValue("State", state, ss);
+ LogKeyValue("TraceId", traceId, ss);
+ LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss);
+ LogKeyValue("ShardId", ToString(record.GetOrigin()), ss);
+
+ TStringBuilder locksDebugStr;
+ locksDebugStr << "[";
+ for (const auto& lock : record.GetTxLocks()) {
+ locksDebugStr << lock.ShortDebugString() << " ";
+ }
+ locksDebugStr << "]";
+
+ LogKeyValue("Locks", locksDebugStr, ss);
+ LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss);
+ LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true);
+
+ return ss.Str();
+ };
+
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev));
+}
+
+template <typename TActorResultInfo>
+inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) {
+ auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) {
+ TStringStream ss;
+ LogKeyValue("Component", "Executer", ss);
+ LogKeyValue("Type", type, ss);
+ LogKeyValue("TraceId", traceId, ss);
+ LogKeyValue("PhyTxId", ToString(txId), ss);
+
+ TStringBuilder locksDebugStr;
+ locksDebugStr << "[";
+ for (const auto& lock : info.GetLocks()) {
+ locksDebugStr << lock.ShortDebugString() << " ";
+ }
+ locksDebugStr << "]";
+
+ LogKeyValue("Locks", locksDebugStr, ss);
return ss.Str();
};
- LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info));
}
// WriteActor,BufferActor
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index de55152154..aa6a1d1755 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -198,6 +198,7 @@ public:
if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) {
NKikimrTxDataShard::TEvKqpInputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
+ NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
@@ -216,6 +217,7 @@ public:
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings");
+ NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext());
for (auto& lock : info.GetLocks()) {
if (!TxManager) {
Locks.push_back(lock);
@@ -506,6 +508,7 @@ private:
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId);
+ NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
@@ -575,6 +578,7 @@ private:
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
+ NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
@@ -1105,7 +1109,7 @@ private:
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
}
- NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
+ NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards");
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
@@ -1246,6 +1250,7 @@ private:
NYql::TIssues issues;
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
+ NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
<< ", TxId=" << ev->Get()->Record.GetTxId()
@@ -1317,6 +1322,7 @@ private:
TShardState* shardState = ShardStates.FindPtr(shardId);
YQL_ENSURE(shardState);
+ NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext());
LOG_D("Got propose result, shard: " << shardId << ", status: "
<< NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus())
<< ", error: " << res->GetError());
@@ -1807,7 +1813,8 @@ private:
flags));
}
- NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
+ NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(),
+ Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
ResponseEv->Orbit.Fork(evData->Orbit);
ev = std::move(evData);
@@ -1843,7 +1850,8 @@ private:
auto traceId = ExecuterSpan.GetTraceId();
- NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
+ NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(),
+ Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
auto shardsToString = [](const auto& shards) {
TStringBuilder builder;
diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
index f8d362bb53..1625298ec2 100644
--- a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
+++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
@@ -1,5 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <regex>
+
namespace NKikimr {
namespace NKqp {
@@ -43,7 +45,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
}
// check executer logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0);
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0);
// check grpc logs
@@ -142,7 +144,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
// check write actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 3);
// check executer logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 4);
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 11);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
@@ -203,8 +205,8 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
- // check executer logs (should be empty, because executer only logs modification operations)
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0);
+ // check executer logs (should be 1, because executer only logs result for read actor)
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
// check session actor logs
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2);
// check grpc logs
@@ -213,45 +215,143 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0);
}
- Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) {
+ Y_UNIT_TEST(BrokenReadLock) {
TStringStream ss;
{
TKikimrSettings serverSettings;
serverSettings.LogStream = &ss;
TKikimrRunner kikimr(serverSettings);
kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
- NYdb::NScripting::TScriptingClient client(kikimr.GetDriver());
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ std::optional<TTransaction> tx1;
- const auto query = R"(
- --!syntax_v1
+ { // tx1: read
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
- UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
- (3u, "Value3"),
- (101u, "Value101"),
- (201u, "Value201");
- )";
+ SELECT * FROM `/Root/KeyValue` WHERE Key = 1u OR Key = 2u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // tx2: write + commit
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
- if (Streaming) {
- auto result = client.StreamExecuteYqlScript(query).GetValueSync();
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
- CollectStreamResult(result);
- } else {
- auto result = client.ExecuteYqlScript(query).GetValueSync();
+ }
+
+ { // tx1: commit
+ auto result = tx1->Commit().ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}
-
- // check executer logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1);
- // check session actor logs (should contain double logs because this query was executed via worker actor)
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4);
- // check grpc logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2);
- // check datashard logs
- UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2);
- Cout << ss.Str() << Endl;
+ auto logRows = SplitString(ss.Str(), "DATA_INTEGRITY");
+ std::string readLock;
+ std::string brokenLock;
+ for (const auto& row : logRows) {
+ // we need to find row with info about read physical tx and extract lock id
+ if (row.Contains("Component: Executer,Type: InputActorResult")) {
+ std::regex lockIdRegex(R"(LockId:\s*(\d+))");
+ std::smatch lockIdMatch;
+ UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract read lock id");
+ readLock = lockIdMatch[1].str();
+ }
+
+ // we need to find row with info about broken locks and extract lock id
+ if (row.Contains("Component: DataShard,Type: Locks")) {
+ std::regex lockIdRegex(R"(BreakLocks:\s*\[(\d+)\s*\])");
+ std::smatch lockIdMatch;
+ UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract broken lock id");
+ brokenLock = lockIdMatch[1].str();
+ }
+ }
+
+ UNIT_ASSERT_C(!readLock.empty() && readLock == brokenLock, "read lock should be broken");
+ }
+
+ Y_UNIT_TEST(BrokenReadLockAbortedTx) {
+ TStringStream ss;
+ {
+ TKikimrSettings serverSettings;
+ serverSettings.LogStream = &ss;
+ TKikimrRunner kikimr(serverSettings);
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ std::optional<TTransaction> tx1;
+
+ { // tx1: read
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM `/Root/KeyValue` WHERE Key = 1u OR Key = 2u;
+ )", TTxControl::BeginTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1u];["One"]];
+ [[2u];["Two"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ tx1 = result.GetTransaction();
+ UNIT_ASSERT(tx1);
+ }
+
+ { // tx2: write + commit
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
+ (1u, "NewValue1");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ { // tx1: write + commit
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
+ (1000u, "Value1000");
+ )", TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+ }
+ }
+
+ auto logRows = SplitString(ss.Str(), "DATA_INTEGRITY");
+ std::string readLock;
+ std::string brokenLock;
+ for (const auto& row : logRows) {
+ // we need to find row with info about read physical tx and extract lock id
+ if (row.Contains("Component: Executer,Type: InputActorResult")) {
+ std::regex lockIdRegex(R"(LockId:\s*(\d+))");
+ std::smatch lockIdMatch;
+ UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract read lock id");
+ readLock = lockIdMatch[1].str();
+ }
+
+ // we need to find row with info about broken locks and extract lock id
+ if (row.Contains("Component: DataShard,Type: Locks")) {
+ std::regex lockIdRegex(R"(BreakLocks:\s*\[(\d+)\s*\])");
+ std::smatch lockIdMatch;
+ UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract broken lock id");
+ brokenLock = lockIdMatch[1].str();
+ }
+ }
+
+ UNIT_ASSERT_C(!readLock.empty() && readLock == brokenLock, "read lock should be broken");
}
}
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 263caa364a..a850072805 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -2326,7 +2326,7 @@ private:
sysLocks.BreakSetLocks();
}
- auto locks = sysLocks.ApplyLocks();
+ auto [locks, _] = sysLocks.ApplyLocks();
for (auto& lock : locks) {
NKikimrDataEvents::TLock* addLock;
@@ -2958,7 +2958,7 @@ public:
state.Lock = nullptr;
} else {
// Lock valid, apply conflict changes
- auto locks = sysLocks.ApplyLocks();
+ auto [locks, _] = sysLocks.ApplyLocks();
Y_ABORT_UNLESS(locks.empty(), "ApplyLocks acquired unexpected locks");
}
}
diff --git a/ydb/core/tx/datashard/datashard_integrity_trails.h b/ydb/core/tx/datashard/datashard_integrity_trails.h
index de0a65569a..0883b7f8d9 100644
--- a/ydb/core/tx/datashard/datashard_integrity_trails.h
+++ b/ydb/core/tx/datashard/datashard_integrity_trails.h
@@ -126,6 +126,32 @@ inline void LogIntegrityTrailsKeys(const NActors::TActorContext& ctx, const ui64
}
}
+inline void LogIntegrityTrailsLocks(const TActorContext& ctx, const ui64 tabletId, const ui64 txId, const TVector<ui64>& locks) {
+ if (locks.empty()) {
+ return;
+ }
+
+ auto logFn = [&]() {
+ TStringStream ss;
+
+ LogKeyValue("Component", "DataShard", ss);
+ LogKeyValue("Type", "Locks", ss);
+ LogKeyValue("TabletId", ToString(tabletId), ss);
+ LogKeyValue("PhyTxId", ToString(txId), ss);
+
+ ss << "BreakLocks: [";
+ for (const auto& lock : locks) {
+ ss << lock << " ";
+ }
+ ss << "]";
+
+ return ss.Str();
+ };
+
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, logFn());
+
+}
+
template <typename TxResult>
inline void LogIntegrityTrailsFinish(const NActors::TActorContext& ctx, const ui64 tabletId, const ui64 txId, const typename TxResult::EStatus status) {
auto logFn = [&]() {
diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp
index 60f3ed8b03..e512e223e6 100644
--- a/ydb/core/tx/datashard/datashard_ut_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp
@@ -201,7 +201,7 @@ namespace NTest {
}
TVector<TSysLocks::TLock> ApplyTxLocks() {
- auto locks = Locks.ApplyLocks();
+ auto [locks, _] = Locks.ApplyLocks();
Locks.ResetUpdate();
return locks;
}
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index 5d45d88202..310bf3c539 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -346,7 +346,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
}
void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) {
- auto locks = DataShard.SysLocksTable().ApplyLocks();
+ auto [locks, _] = DataShard.SysLocksTable().ApplyLocks();
for (const auto& lock : locks) {
if (lock.IsError()) {
LOG_NOTICE_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 8c667b4266..7e989238ab 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_integrity_trails.h"
#include "datashard_kqp.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -214,7 +215,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
KqpEraseLocks(tabletId, kqpLocks, sysLocks);
- sysLocks.ApplyLocks();
+ auto [_, locksBrokenByTx] = sysLocks.ApplyLocks();
+ NDataIntegrity::LogIntegrityTrailsLocks(ctx, tabletId, txId, locksBrokenByTx);
DataShard.SubscribeNewLocks(ctx);
if (locksDb.HasChanges()) {
op->SetWaitCompletionFlag(true);
@@ -469,7 +471,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) {
- auto locks = DataShard.SysLocksTable().ApplyLocks();
+ auto [locks, locksBrokenByTx] = DataShard.SysLocksTable().ApplyLocks();
+ NDataIntegrity::LogIntegrityTrailsLocks(ctx, DataShard.TabletID(), op->GetTxId(), locksBrokenByTx);
LOG_T("add locks to result: " << locks.size());
for (const auto& lock : locks) {
if (lock.IsError()) {
diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp
index e6b7c43f2a..ef0bc85cc7 100644
--- a/ydb/core/tx/datashard/execute_write_unit.cpp
+++ b/ydb/core/tx/datashard/execute_write_unit.cpp
@@ -4,6 +4,7 @@
#include "datashard_locks_db.h"
#include "datashard_user_db.h"
#include "datashard_kqp.h"
+#include "datashard_integrity_trails.h"
#include <ydb/core/engine/mkql_engine_flat_host.h>
@@ -41,7 +42,8 @@ public:
void AddLocksToResult(TWriteOperation* writeOp, const TActorContext& ctx) {
NEvents::TDataEvents::TEvWriteResult& writeResult = *writeOp->GetWriteResult();
- auto locks = DataShard.SysLocksTable().ApplyLocks();
+ auto [locks, locksBrokenByTx] = DataShard.SysLocksTable().ApplyLocks();
+ NDataIntegrity::LogIntegrityTrailsLocks(ctx, DataShard.TabletID(), writeOp->GetTxId(), locksBrokenByTx);
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "add locks to result: " << locks.size());
for (const auto& lock : locks) {
if (lock.IsError()) {
@@ -344,7 +346,8 @@ public:
}
KqpEraseLocks(tabletId, kqpLocks, sysLocks);
- sysLocks.ApplyLocks();
+ auto [_, locksBrokenByTx] = sysLocks.ApplyLocks();
+ NDataIntegrity::LogIntegrityTrailsLocks(ctx, tabletId, txId, locksBrokenByTx);
DataShard.SubscribeNewLocks(ctx);
if (locksDb.HasChanges()) {
op->SetWaitCompletionFlag(true);
diff --git a/ydb/core/tx/locks/locks.cpp b/ydb/core/tx/locks/locks.cpp
index 8b2b6ef03b..6162f84432 100644
--- a/ydb/core/tx/locks/locks.cpp
+++ b/ydb/core/tx/locks/locks.cpp
@@ -973,7 +973,7 @@ TLocksUpdate::~TLocksUpdate() {
// TSysLocks
-TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
+std::pair<TVector<TSysLocks::TLock>, TVector<ui64>> TSysLocks::ApplyLocks() {
Y_ABORT_UNLESS(Update);
TMicrosecTimerCounter measureApplyLocks(*Self, COUNTER_APPLY_LOCKS_USEC);
@@ -988,8 +988,14 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
Locker.RemoveBrokenRanges();
Update->FlattenBreakLocks();
+
+ TVector<ui64> brokenLocks;
+ brokenLocks.reserve(Update->BreakLocks.Size());
if (Update->BreakLocks) {
Locker.BreakLocks(Update->BreakLocks, breakVersion);
+ for (const auto& lock : Update->BreakLocks) {
+ brokenLocks.push_back(lock.GetLockId());
+ }
}
Locker.SaveBrokenPersistentLocks(Db);
@@ -1019,7 +1025,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
// Adding read/write conflicts implies locking
Y_ABORT_UNLESS(!Update->ReadConflictLocks);
Y_ABORT_UNLESS(!Update->WriteConflictLocks);
- return TVector<TLock>();
+ return {TVector<TLock>(), brokenLocks};
}
TLockInfo::TPtr lock;
@@ -1093,7 +1099,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
out.emplace_back(MakeLock(Update->LockTxId, lock ? lock->GetGeneration() : Self->Generation(), counter,
table.GetTableId(), Update->Lock && Update->Lock->IsWriteLock()));
}
- return out;
+ return {out, brokenLocks};
}
void TSysLocks::UpdateCounters() {
diff --git a/ydb/core/tx/locks/locks.h b/ydb/core/tx/locks/locks.h
index 76c486123a..042c079e5f 100644
--- a/ydb/core/tx/locks/locks.h
+++ b/ydb/core/tx/locks/locks.h
@@ -878,7 +878,7 @@ public:
Locker.RemoveSchema(tableId, db);
}
- TVector<TLock> ApplyLocks();
+ std::pair<TVector<TLock>, TVector<ui64>> ApplyLocks();
ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const;
TLock GetLock(const TArrayRef<const TCell>& syslockKey) const;
void EraseLock(ui64 lockId);