diff options
author | Iuliia Sidorina <yulia@ydb.tech> | 2025-03-04 12:07:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-04 12:07:39 +0100 |
commit | 9eea4e372ad51d5e90a0446e64a1751d2532214f (patch) | |
tree | cecc9443feed25f67837cd667e48e0de999afc2d | |
parent | 18f45b9e2316ffe02a1561d66e811d12f5abd8cc (diff) | |
download | ydb-9eea4e372ad51d5e90a0446e64a1751d2532214f.tar.gz |
feat(data_integrity_trails): add logging of acquired and broken locks (#13164)
-rw-r--r-- | ydb/core/kqp/common/kqp_data_integrity_trails.h | 97 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp | 156 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_integrity_trails.h | 26 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_locks.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_data_tx_unit.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_write_unit.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/locks/locks.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/locks/locks.h | 2 |
11 files changed, 282 insertions, 47 deletions
diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h index 71198204a5..28566a975d 100644 --- a/ydb/core/kqp/common/kqp_data_integrity_trails.h +++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h @@ -6,6 +6,8 @@ #include <library/cpp/string_utils/base64/base64.h> #include <ydb/core/data_integrity_trails/data_integrity_trails.h> +#include <ydb/core/tx/data_events/events.h> +#include <ydb/core/tx/datashard/datashard.h> namespace NKikimr { namespace NDataIntegrity { @@ -97,23 +99,110 @@ inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction } // DataExecuter -inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) { - auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) { +inline void LogIntegrityTrails(const TString& txType, const TString& txLocksDebugStr, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& txLocksDebugStr, const auto& traceId, const auto& txId, const auto& shardId) { TStringStream ss; LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Request", ss); LogKeyValue("TraceId", traceId, ss); LogKeyValue("PhyTxId", ToString(txId), ss); + LogKeyValue("Locks", "[" + txLocksDebugStr + "]", ss); if (shardId) { LogKeyValue("ShardId", ToString(*shardId), ss); } - LogKeyValue("Type", type, ss, /*last*/ true); + LogKeyValue("TxType", type, ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, txLocksDebugStr, traceId, txId, shardId)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const NEvents::TDataEvents::TEvWriteResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString() << " "; + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()), ss); + + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetIssues(), issues); + LogKeyValue("Issues", issues.ToString(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +inline void LogIntegrityTrails(const TString& state, const TString& traceId, const TEvDataShard::TEvProposeTransactionResult::TPtr& ev, const TActorContext& ctx) { + auto log = [](const auto& state, const auto& traceId, const auto& ev) { + const auto& record = ev->Get()->Record; + + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", "Response", ss); + LogKeyValue("State", state, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(record.GetTxId()), ss); + LogKeyValue("ShardId", ToString(record.GetOrigin()), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : record.GetTxLocks()) { + locksDebugStr << lock.ShortDebugString() << " "; + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); + LogKeyValue("Status", NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(ev->Get()->GetStatus()), ss); + LogKeyValue("Issues", ev->Get()->GetError(), ss, /*last*/ true); + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(state, traceId, ev)); +} + +template <typename TActorResultInfo> +inline void LogIntegrityTrails(const TString& type, const TString& traceId, ui64 txId, const TActorResultInfo& info, const TActorContext& ctx) { + auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& info) { + TStringStream ss; + LogKeyValue("Component", "Executer", ss); + LogKeyValue("Type", type, ss); + LogKeyValue("TraceId", traceId, ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + TStringBuilder locksDebugStr; + locksDebugStr << "["; + for (const auto& lock : info.GetLocks()) { + locksDebugStr << lock.ShortDebugString() << " "; + } + locksDebugStr << "]"; + + LogKeyValue("Locks", locksDebugStr, ss); return ss.Str(); }; - LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId)); + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(type, traceId, txId, info)); } // WriteActor,BufferActor diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index de55152154..aa6a1d1755 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -198,6 +198,7 @@ public: if (data.GetData().template Is<NKikimrTxDataShard::TEvKqpInputActorResultInfo>()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("InputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -216,6 +217,7 @@ public: } else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); + NDataIntegrity::LogIntegrityTrails("OutputActorResult", Request.UserTraceId, TxId, info, TlsActivationContext->AsActorContext()); for (auto& lock : info.GetLocks()) { if (!TxManager) { Locks.push_back(lock); @@ -506,6 +508,7 @@ private: TShardState* shardState = ShardStates.FindPtr(shardId); YQL_ENSURE(shardState, "Unexpected propose result from unknown tabletId " << shardId); + NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -575,6 +578,7 @@ private: NYql::TIssues issues; NYql::IssuesFromMessage(res->Record.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Prepare", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1105,7 +1109,7 @@ private: transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile); } - NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext()); + NDataIntegrity::LogIntegrityTrails("PlannedTx", "", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext()); LOG_D("Execute planned transaction, coordinator: " << TxCoordinator << " for " << affectedSet.size() << "shards"); Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true)); @@ -1246,6 +1250,7 @@ private: NYql::TIssues issues; NYql::IssuesFromMessage(res->Record.GetIssues(), issues); + NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId << ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus()) << ", TxId=" << ev->Get()->Record.GetTxId() @@ -1317,6 +1322,7 @@ private: TShardState* shardState = ShardStates.FindPtr(shardId); YQL_ENSURE(shardState); + NDataIntegrity::LogIntegrityTrails("Execute", Request.UserTraceId, ev, TlsActivationContext->AsActorContext()); LOG_D("Got propose result, shard: " << shardId << ", status: " << NKikimrTxDataShard::TEvProposeTransactionResult_EStatus_Name(res->GetStatus()) << ", error: " << res->GetError()); @@ -1807,7 +1813,8 @@ private: flags)); } - NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); + NDataIntegrity::LogIntegrityTrails("DatashardTx", dataTransaction.GetKqpTransaction().GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); ResponseEv->Orbit.Fork(evData->Orbit); ev = std::move(evData); @@ -1843,7 +1850,8 @@ private: auto traceId = ExecuterSpan.GetTraceId(); - NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); + NDataIntegrity::LogIntegrityTrails("EvWriteTx", evWriteTransaction->Record.GetLocks().ShortDebugString(), + Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext()); auto shardsToString = [](const auto& shards) { TStringBuilder builder; diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp index f8d362bb53..1625298ec2 100644 --- a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp +++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp @@ -1,5 +1,7 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <regex> + namespace NKikimr { namespace NKqp { @@ -43,7 +45,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { } // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 1 : 0); + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), LogEnabled ? 2 : 0); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), LogEnabled ? 2 : 0); // check grpc logs @@ -142,7 +144,7 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { // check write actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: WriteActor"), 3); // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 4); + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 11); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); // check grpc logs @@ -203,8 +205,8 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } - // check executer logs (should be empty, because executer only logs modification operations) - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 0); + // check executer logs (should be 1, because executer only logs result for read actor) + UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1); // check session actor logs UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 2); // check grpc logs @@ -213,45 +215,143 @@ Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) { UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 0); } - Y_UNIT_TEST_TWIN(UpsertViaLegacyScripting, Streaming) { + Y_UNIT_TEST(BrokenReadLock) { TStringStream ss; { TKikimrSettings serverSettings; serverSettings.LogStream = &ss; TKikimrRunner kikimr(serverSettings); kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE); - NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + std::optional<TTransaction> tx1; - const auto query = R"( - --!syntax_v1 + { // tx1: read + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 - UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES - (3u, "Value3"), - (101u, "Value101"), - (201u, "Value201"); - )"; + SELECT * FROM `/Root/KeyValue` WHERE Key = 1u OR Key = 2u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // tx2: write + commit + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 - if (Streaming) { - auto result = client.StreamExecuteYqlScript(query).GetValueSync(); + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - CollectStreamResult(result); - } else { - auto result = client.ExecuteYqlScript(query).GetValueSync(); + } + + { // tx1: commit + auto result = tx1->Commit().ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); } } - - // check executer logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: Executer"), 1); - // check session actor logs (should contain double logs because this query was executed via worker actor) - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: SessionActor"), 4); - // check grpc logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY TRACE: Component: Grpc"), 2); - // check datashard logs - UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: DataShard"), 2); - Cout << ss.Str() << Endl; + auto logRows = SplitString(ss.Str(), "DATA_INTEGRITY"); + std::string readLock; + std::string brokenLock; + for (const auto& row : logRows) { + // we need to find row with info about read physical tx and extract lock id + if (row.Contains("Component: Executer,Type: InputActorResult")) { + std::regex lockIdRegex(R"(LockId:\s*(\d+))"); + std::smatch lockIdMatch; + UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract read lock id"); + readLock = lockIdMatch[1].str(); + } + + // we need to find row with info about broken locks and extract lock id + if (row.Contains("Component: DataShard,Type: Locks")) { + std::regex lockIdRegex(R"(BreakLocks:\s*\[(\d+)\s*\])"); + std::smatch lockIdMatch; + UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract broken lock id"); + brokenLock = lockIdMatch[1].str(); + } + } + + UNIT_ASSERT_C(!readLock.empty() && readLock == brokenLock, "read lock should be broken"); + } + + Y_UNIT_TEST(BrokenReadLockAbortedTx) { + TStringStream ss; + { + TKikimrSettings serverSettings; + serverSettings.LogStream = &ss; + TKikimrRunner kikimr(serverSettings); + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_TRACE); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + std::optional<TTransaction> tx1; + + { // tx1: read + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + SELECT * FROM `/Root/KeyValue` WHERE Key = 1u OR Key = 2u; + )", TTxControl::BeginTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1u];["One"]]; + [[2u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + tx1 = result.GetTransaction(); + UNIT_ASSERT(tx1); + } + + { // tx2: write + commit + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES + (1u, "NewValue1"); + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { // tx1: write + commit + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES + (1000u, "Value1000"); + )", TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } + } + + auto logRows = SplitString(ss.Str(), "DATA_INTEGRITY"); + std::string readLock; + std::string brokenLock; + for (const auto& row : logRows) { + // we need to find row with info about read physical tx and extract lock id + if (row.Contains("Component: Executer,Type: InputActorResult")) { + std::regex lockIdRegex(R"(LockId:\s*(\d+))"); + std::smatch lockIdMatch; + UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract read lock id"); + readLock = lockIdMatch[1].str(); + } + + // we need to find row with info about broken locks and extract lock id + if (row.Contains("Component: DataShard,Type: Locks")) { + std::regex lockIdRegex(R"(BreakLocks:\s*\[(\d+)\s*\])"); + std::smatch lockIdMatch; + UNIT_ASSERT_C(std::regex_search(row.data(), lockIdMatch, lockIdRegex) || lockIdMatch.size() != 2, "failed to extract broken lock id"); + brokenLock = lockIdMatch[1].str(); + } + } + + UNIT_ASSERT_C(!readLock.empty() && readLock == brokenLock, "read lock should be broken"); } } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 263caa364a..a850072805 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -2326,7 +2326,7 @@ private: sysLocks.BreakSetLocks(); } - auto locks = sysLocks.ApplyLocks(); + auto [locks, _] = sysLocks.ApplyLocks(); for (auto& lock : locks) { NKikimrDataEvents::TLock* addLock; @@ -2958,7 +2958,7 @@ public: state.Lock = nullptr; } else { // Lock valid, apply conflict changes - auto locks = sysLocks.ApplyLocks(); + auto [locks, _] = sysLocks.ApplyLocks(); Y_ABORT_UNLESS(locks.empty(), "ApplyLocks acquired unexpected locks"); } } diff --git a/ydb/core/tx/datashard/datashard_integrity_trails.h b/ydb/core/tx/datashard/datashard_integrity_trails.h index de0a65569a..0883b7f8d9 100644 --- a/ydb/core/tx/datashard/datashard_integrity_trails.h +++ b/ydb/core/tx/datashard/datashard_integrity_trails.h @@ -126,6 +126,32 @@ inline void LogIntegrityTrailsKeys(const NActors::TActorContext& ctx, const ui64 } } +inline void LogIntegrityTrailsLocks(const TActorContext& ctx, const ui64 tabletId, const ui64 txId, const TVector<ui64>& locks) { + if (locks.empty()) { + return; + } + + auto logFn = [&]() { + TStringStream ss; + + LogKeyValue("Component", "DataShard", ss); + LogKeyValue("Type", "Locks", ss); + LogKeyValue("TabletId", ToString(tabletId), ss); + LogKeyValue("PhyTxId", ToString(txId), ss); + + ss << "BreakLocks: ["; + for (const auto& lock : locks) { + ss << lock << " "; + } + ss << "]"; + + return ss.Str(); + }; + + LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, logFn()); + +} + template <typename TxResult> inline void LogIntegrityTrailsFinish(const NActors::TActorContext& ctx, const ui64 tabletId, const ui64 txId, const typename TxResult::EStatus status) { auto logFn = [&]() { diff --git a/ydb/core/tx/datashard/datashard_ut_locks.cpp b/ydb/core/tx/datashard/datashard_ut_locks.cpp index 60f3ed8b03..e512e223e6 100644 --- a/ydb/core/tx/datashard/datashard_ut_locks.cpp +++ b/ydb/core/tx/datashard/datashard_ut_locks.cpp @@ -201,7 +201,7 @@ namespace NTest { } TVector<TSysLocks::TLock> ApplyTxLocks() { - auto locks = Locks.ApplyLocks(); + auto [locks, _] = Locks.ApplyLocks(); Locks.ResetUpdate(); return locks; } diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 5d45d88202..310bf3c539 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -346,7 +346,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, } void TExecuteDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) { - auto locks = DataShard.SysLocksTable().ApplyLocks(); + auto [locks, _] = DataShard.SysLocksTable().ApplyLocks(); for (const auto& lock : locks) { if (lock.IsError()) { LOG_NOTICE_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 8c667b4266..7e989238ab 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_integrity_trails.h" #include "datashard_kqp.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -214,7 +215,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } KqpEraseLocks(tabletId, kqpLocks, sysLocks); - sysLocks.ApplyLocks(); + auto [_, locksBrokenByTx] = sysLocks.ApplyLocks(); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, tabletId, txId, locksBrokenByTx); DataShard.SubscribeNewLocks(ctx); if (locksDb.HasChanges()) { op->SetWaitCompletionFlag(true); @@ -469,7 +471,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } void TExecuteKqpDataTxUnit::AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx) { - auto locks = DataShard.SysLocksTable().ApplyLocks(); + auto [locks, locksBrokenByTx] = DataShard.SysLocksTable().ApplyLocks(); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, DataShard.TabletID(), op->GetTxId(), locksBrokenByTx); LOG_T("add locks to result: " << locks.size()); for (const auto& lock : locks) { if (lock.IsError()) { diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp index e6b7c43f2a..ef0bc85cc7 100644 --- a/ydb/core/tx/datashard/execute_write_unit.cpp +++ b/ydb/core/tx/datashard/execute_write_unit.cpp @@ -4,6 +4,7 @@ #include "datashard_locks_db.h" #include "datashard_user_db.h" #include "datashard_kqp.h" +#include "datashard_integrity_trails.h" #include <ydb/core/engine/mkql_engine_flat_host.h> @@ -41,7 +42,8 @@ public: void AddLocksToResult(TWriteOperation* writeOp, const TActorContext& ctx) { NEvents::TDataEvents::TEvWriteResult& writeResult = *writeOp->GetWriteResult(); - auto locks = DataShard.SysLocksTable().ApplyLocks(); + auto [locks, locksBrokenByTx] = DataShard.SysLocksTable().ApplyLocks(); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, DataShard.TabletID(), writeOp->GetTxId(), locksBrokenByTx); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "add locks to result: " << locks.size()); for (const auto& lock : locks) { if (lock.IsError()) { @@ -344,7 +346,8 @@ public: } KqpEraseLocks(tabletId, kqpLocks, sysLocks); - sysLocks.ApplyLocks(); + auto [_, locksBrokenByTx] = sysLocks.ApplyLocks(); + NDataIntegrity::LogIntegrityTrailsLocks(ctx, tabletId, txId, locksBrokenByTx); DataShard.SubscribeNewLocks(ctx); if (locksDb.HasChanges()) { op->SetWaitCompletionFlag(true); diff --git a/ydb/core/tx/locks/locks.cpp b/ydb/core/tx/locks/locks.cpp index 8b2b6ef03b..6162f84432 100644 --- a/ydb/core/tx/locks/locks.cpp +++ b/ydb/core/tx/locks/locks.cpp @@ -973,7 +973,7 @@ TLocksUpdate::~TLocksUpdate() { // TSysLocks -TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { +std::pair<TVector<TSysLocks::TLock>, TVector<ui64>> TSysLocks::ApplyLocks() { Y_ABORT_UNLESS(Update); TMicrosecTimerCounter measureApplyLocks(*Self, COUNTER_APPLY_LOCKS_USEC); @@ -988,8 +988,14 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { Locker.RemoveBrokenRanges(); Update->FlattenBreakLocks(); + + TVector<ui64> brokenLocks; + brokenLocks.reserve(Update->BreakLocks.Size()); if (Update->BreakLocks) { Locker.BreakLocks(Update->BreakLocks, breakVersion); + for (const auto& lock : Update->BreakLocks) { + brokenLocks.push_back(lock.GetLockId()); + } } Locker.SaveBrokenPersistentLocks(Db); @@ -1019,7 +1025,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { // Adding read/write conflicts implies locking Y_ABORT_UNLESS(!Update->ReadConflictLocks); Y_ABORT_UNLESS(!Update->WriteConflictLocks); - return TVector<TLock>(); + return {TVector<TLock>(), brokenLocks}; } TLockInfo::TPtr lock; @@ -1093,7 +1099,7 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { out.emplace_back(MakeLock(Update->LockTxId, lock ? lock->GetGeneration() : Self->Generation(), counter, table.GetTableId(), Update->Lock && Update->Lock->IsWriteLock())); } - return out; + return {out, brokenLocks}; } void TSysLocks::UpdateCounters() { diff --git a/ydb/core/tx/locks/locks.h b/ydb/core/tx/locks/locks.h index 76c486123a..042c079e5f 100644 --- a/ydb/core/tx/locks/locks.h +++ b/ydb/core/tx/locks/locks.h @@ -878,7 +878,7 @@ public: Locker.RemoveSchema(tableId, db); } - TVector<TLock> ApplyLocks(); + std::pair<TVector<TLock>, TVector<ui64>> ApplyLocks(); ui64 ExtractLockTxId(const TArrayRef<const TCell>& syslockKey) const; TLock GetLock(const TArrayRef<const TCell>& syslockKey) const; void EraseLock(ui64 lockId); |