summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <[email protected]>2025-04-21 13:39:50 +0300
committerziganshinmr <[email protected]>2025-04-21 13:56:18 +0300
commitbec56a4851da142085dcc9727c51127a20de0742 (patch)
tree999035e117029e2bd864bb0c52b12e47f207cae0
parent0f2979ead053c46892a944abc8a5417f25d6ec0f (diff)
Fix ExternalTempTablesCount handling in transaction cache
commit_hash:57272e28f5f7966187b7caf0ff0850cbe0b20930
-rw-r--r--yt/yql/providers/yt/gateway/lib/transaction_cache.cpp30
-rw-r--r--yt/yql/providers/yt/gateway/lib/transaction_cache.h7
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp2
3 files changed, 27 insertions, 12 deletions
diff --git a/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp b/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp
index 5e1f2925b02..2b3d4342e72 100644
--- a/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp
+++ b/yt/yql/providers/yt/gateway/lib/transaction_cache.cpp
@@ -18,7 +18,7 @@ using namespace NYT;
void TTransactionCache::TEntry::DeleteAtFinalizeUnlocked(const TString& table, bool isInternal)
{
- auto inserted = TablesToDeleteAtFinalize.insert(table);
+ auto inserted = TablesToDeleteAtFinalize.emplace(table, false);
if (!isInternal && inserted.second) {
if (++ExternalTempTablesCount > InflightTempTablesLimit) {
YQL_LOG_CTX_THROW yexception() << "Too many temporary tables registered - limit is " << InflightTempTablesLimit;
@@ -28,14 +28,28 @@ void TTransactionCache::TEntry::DeleteAtFinalizeUnlocked(const TString& table, b
bool TTransactionCache::TEntry::CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal)
{
- auto erased = TablesToDeleteAtFinalize.erase(table);
- if (!isInternal) {
- YQL_ENSURE(erased <= ExternalTempTablesCount);
- ExternalTempTablesCount -= erased;
+ auto it = TablesToDeleteAtFinalize.find(table);
+ bool present = it != TablesToDeleteAtFinalize.end();
+ if (present) {
+ if (!isInternal && !it->second) {
+ YQL_ENSURE(ExternalTempTablesCount > 0);
+ ExternalTempTablesCount--;
+ }
+ TablesToDeleteAtFinalize.erase(it);
}
- return erased != 0;
+ return present;
}
+bool TTransactionCache::TEntry::AssumeAsDeletedAtFinalizeUnlocked(const TString& table) {
+ auto it = TablesToDeleteAtFinalize.find(table);
+ bool present = it != TablesToDeleteAtFinalize.end();
+ if (present && !it->second) {
+ YQL_ENSURE(ExternalTempTablesCount > 0);
+ ExternalTempTablesCount--;
+ it->second = true;
+ }
+ return present;
+}
void TTransactionCache::TEntry::RemoveInternal(const TString& table) {
bool existed;
@@ -57,7 +71,7 @@ void TTransactionCache::TEntry::DoRemove(const TString& table) {
void TTransactionCache::TEntry::Finalize(const TString& clusterName) {
NYT::ITransactionPtr binarySnapshotTx;
decltype(SnapshotTxs) snapshotTxs;
- THashSet<TString> toDelete;
+ THashMap<TString, bool> toDelete;
decltype(CheckpointTxs) checkpointTxs;
decltype(WriteTxs) writeTxs;
with_lock(Lock_) {
@@ -86,7 +100,7 @@ void TTransactionCache::TEntry::Finalize(const TString& clusterName) {
item.second->Abort();
}
- for (auto i : toDelete) {
+ for (auto& [i, _] : toDelete) {
DoRemove(i);
}
diff --git a/yt/yql/providers/yt/gateway/lib/transaction_cache.h b/yt/yql/providers/yt/gateway/lib/transaction_cache.h
index f3b7b56361b..61fc1254e05 100644
--- a/yt/yql/providers/yt/gateway/lib/transaction_cache.h
+++ b/yt/yql/providers/yt/gateway/lib/transaction_cache.h
@@ -42,7 +42,7 @@ public:
THashMap<NYT::TTransactionId, NYT::ITransactionPtr> SnapshotTxs;
THashMap<NYT::TTransactionId, NYT::ITransactionPtr> WriteTxs;
NYT::ITransactionPtr LastSnapshotTx;
- THashSet<TString> TablesToDeleteAtFinalize;
+ THashMap<TString, bool> TablesToDeleteAtFinalize; // table -> assumed as deleted
THashSet<TString> TablesToDeleteAtCommit;
ui32 InflightTempTablesLimit = Max<ui32>();
bool KeepTables = false;
@@ -85,11 +85,11 @@ public:
void Finalize(const TString& clusterName);
template<typename T>
- T FilterTablesToDeleteAtFinalize(const T& range) {
+ T AssumeAsDeletedAtFinalize(const T& range) {
T filteredRange;
with_lock(Lock_) {
for (const auto& i : range) {
- if (TablesToDeleteAtFinalize.contains(i)) {
+ if (AssumeAsDeletedAtFinalizeUnlocked(i)) {
filteredRange.insert(filteredRange.end(), i);
}
}
@@ -157,6 +157,7 @@ public:
void DeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
bool CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
+ bool AssumeAsDeletedAtFinalizeUnlocked(const TString& table);
void DoRemove(const TString& table);
size_t ExternalTempTablesCount = 0;
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 2072158f4e5..2465ceac4cd 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -2855,7 +2855,7 @@ private:
const auto entry = execCtx->GetEntry();
- toRemove = entry->FilterTablesToDeleteAtFinalize(toRemove);
+ toRemove = entry->AssumeAsDeletedAtFinalize(toRemove);
if (toRemove.empty()) {
return MakeFuture();
}