summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoratarasov5 <[email protected]>2025-08-12 12:36:41 +0300
committeratarasov5 <[email protected]>2025-08-12 12:56:41 +0300
commit1fccb5d334ed416069ddaf6a68c47f2259e0b91c (patch)
tree96e8fa64b744e9e9a2a75d6317159f419cf72878
parent89279909221ae919ce74ad042a874b088deb7ae4 (diff)
YQL-20222: Use file lock for file download
commit_hash:57157cd533de1498cf44a8e2df329a0788d5b0b4
-rw-r--r--yql/essentials/core/file_storage/file_storage.cpp35
-rw-r--r--yql/essentials/core/file_storage/file_storage.h1
-rw-r--r--yql/essentials/core/file_storage/file_storage_decorator.cpp5
-rw-r--r--yql/essentials/core/file_storage/file_storage_decorator.h1
-rw-r--r--yql/essentials/core/file_storage/file_storage_ut.cpp69
-rw-r--r--yql/essentials/core/file_storage/storage.cpp111
-rw-r--r--yql/essentials/core/file_storage/storage.h3
-rw-r--r--yql/essentials/core/file_storage/storage_ut.cpp20
-rw-r--r--yql/essentials/utils/multi_resource_lock.cpp42
-rw-r--r--yql/essentials/utils/multi_resource_lock.h81
-rw-r--r--yql/essentials/utils/multi_resource_lock_ut.cpp54
-rw-r--r--yql/essentials/utils/ut/ya.make1
-rw-r--r--yql/essentials/utils/ya.make2
13 files changed, 195 insertions, 230 deletions
diff --git a/yql/essentials/core/file_storage/file_storage.cpp b/yql/essentials/core/file_storage/file_storage.cpp
index 95bbaebf930..fc2baf83028 100644
--- a/yql/essentials/core/file_storage/file_storage.cpp
+++ b/yql/essentials/core/file_storage/file_storage.cpp
@@ -13,7 +13,6 @@
#include <yql/essentials/utils/fetch/fetch.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/utils/log/context.h>
-#include <yql/essentials/utils/multi_resource_lock.h>
#include <yql/essentials/utils/md5_stream.h>
#include <yql/essentials/utils/retry.h>
#include <yql/essentials/utils/yql_panic.h>
@@ -30,6 +29,7 @@
#include <util/string/builder.h>
#include <util/stream/str.h>
#include <util/stream/file.h>
+#include <util/system/file_lock.h>
#include <util/stream/null.h>
#include <util/system/env.h>
#include <util/system/fs.h>
@@ -43,6 +43,25 @@
namespace NYql {
+namespace {
+
+constexpr char ComponentName[] = "file_storage";
+
+class TFileLockGuard {
+public:
+ explicit TFileLockGuard(const TFsPath& lockFilePath)
+ : Lock_(lockFilePath)
+ , Guard_(Lock_)
+ {
+ }
+
+private:
+ TFileLock Lock_;
+ TGuard<TFileLock> Guard_;
+};
+
+}
+
class TFileStorageImpl: public IFileStorage {
public:
explicit TFileStorageImpl(const TFileStorageConfig& params, const std::vector<NFS::IDownloaderPtr>& downloaders)
@@ -61,7 +80,7 @@ public:
YQL_LOG(INFO) << "PutFile to cache: " << file;
const auto md5 = FileChecksum(file);
const TString storageFileName = md5 + ".file";
- auto lock = MultiResourceLock_.Acquire(storageFileName);
+ TFileLockGuard lockGuard(GetLockFilePath(storageFileName));
return Storage_.Put(storageFileName, outFileName, md5, [&file, &md5](const TFsPath& dstFile) {
NFs::HardLinkOrCopy(file, dstFile);
i64 length = GetFileLength(dstFile.c_str());
@@ -85,7 +104,7 @@ public:
}
const auto md5 = originalMd5.empty() ? MD5::File(file) : originalMd5;
const auto strippedMetaFile = md5 + ".stripped_meta";
- auto lock = MultiResourceLock_.Acquire(strippedMetaFile);
+ TFileLockGuard lockGuard(GetLockFilePath(strippedMetaFile));
TUrlMeta strippedMeta;
strippedMeta.TryReadFrom(GetRoot() / strippedMetaFile);
@@ -123,7 +142,7 @@ public:
const auto md5 = MD5::Calc(data);
const TString storageFileName = md5 + ".file";
YQL_LOG(INFO) << "PutInline to cache. md5=" << md5;
- auto lock = MultiResourceLock_.Acquire(storageFileName);
+ TFileLockGuard lockGuard(GetLockFilePath(storageFileName));
return Storage_.Put(storageFileName, TString(), md5, [&data, &md5](const TFsPath& dstFile) {
TStringInput in(data);
TFile outFile(dstFile, CreateAlways | ARW | AX);
@@ -179,6 +198,10 @@ public:
return Storage_.GetTemp();
}
+ TFsPath GetLockFilePath(const TString& lockName) const final {
+ return Storage_.GetLockFilePath(ComponentName, lockName);
+ }
+
const TFileStorageConfig& GetConfig() const final {
return Config_;
}
@@ -195,8 +218,7 @@ private:
TFileLinkPtr DoPutUrl(const THttpURL& url, const TString& token, const NFS::IDownloaderPtr& downloader) {
const auto urlMetaFile = BuildUrlMetaFileName(url, token);
- auto lock = MultiResourceLock_.Acquire(urlMetaFile); // let's use meta file as lock name
-
+ TFileLockGuard lockGuard(GetLockFilePath(urlMetaFile));
TUrlMeta urlMeta;
urlMeta.TryReadFrom(GetRoot() / urlMetaFile);
@@ -280,7 +302,6 @@ private:
TStorage Storage_;
const TFileStorageConfig Config_;
std::vector<NFS::IDownloaderPtr> Downloaders_;
- TMultiResourceLock MultiResourceLock_;
const bool UseFakeChecksums_; // YQL-15353
};
diff --git a/yql/essentials/core/file_storage/file_storage.h b/yql/essentials/core/file_storage/file_storage.h
index 1015269deb4..97b285987c5 100644
--- a/yql/essentials/core/file_storage/file_storage.h
+++ b/yql/essentials/core/file_storage/file_storage.h
@@ -30,6 +30,7 @@ struct IFileStorage: public TThrRefBase {
virtual TFsPath GetRoot() const = 0;
virtual TFsPath GetTemp() const = 0;
+ virtual TFsPath GetLockFilePath(const TString& lockName) const = 0;
virtual const TFileStorageConfig& GetConfig() const = 0;
};
diff --git a/yql/essentials/core/file_storage/file_storage_decorator.cpp b/yql/essentials/core/file_storage/file_storage_decorator.cpp
index ec9e5bfac88..258a0eef772 100644
--- a/yql/essentials/core/file_storage/file_storage_decorator.cpp
+++ b/yql/essentials/core/file_storage/file_storage_decorator.cpp
@@ -35,6 +35,11 @@ TFsPath TFileStorageDecorator::GetRoot() const {
TFsPath TFileStorageDecorator::GetTemp() const {
return Inner_->GetTemp();
}
+
+TFsPath TFileStorageDecorator::GetLockFilePath(const TString& lockName) const {
+ return Inner_->GetLockFilePath(lockName);
+}
+
const TFileStorageConfig& TFileStorageDecorator::GetConfig() const {
return Inner_->GetConfig();
}
diff --git a/yql/essentials/core/file_storage/file_storage_decorator.h b/yql/essentials/core/file_storage/file_storage_decorator.h
index bed4e1f67aa..b5116b2ac1a 100644
--- a/yql/essentials/core/file_storage/file_storage_decorator.h
+++ b/yql/essentials/core/file_storage/file_storage_decorator.h
@@ -18,6 +18,7 @@ public:
NThreading::TFuture<TFileLinkPtr> PutUrlAsync(const TString& url, const TString& token) override;
TFsPath GetRoot() const override;
TFsPath GetTemp() const override;
+ TFsPath GetLockFilePath(const TString& lockName) const override;
const TFileStorageConfig& GetConfig() const override;
protected:
diff --git a/yql/essentials/core/file_storage/file_storage_ut.cpp b/yql/essentials/core/file_storage/file_storage_ut.cpp
index bcdab2ed4d7..579013f1b80 100644
--- a/yql/essentials/core/file_storage/file_storage_ut.cpp
+++ b/yql/essentials/core/file_storage/file_storage_ut.cpp
@@ -15,6 +15,7 @@
#include <util/stream/str.h>
#include <util/system/tempfile.h>
#include <util/thread/pool.h>
+#include <util/system/file_lock.h>
using namespace NYql;
using namespace NThreading;
@@ -243,6 +244,74 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) {
UNIT_ASSERT_VALUES_EQUAL(link3->GetMd5(), link4->GetMd5());
}
+ Y_UNIT_TEST(FileLockTest) {
+ auto server = CreateTestHttpServer();
+
+ TString currentETag = "TAG_1";
+ TString currentContent = "ABC";
+
+ TFileStoragePtr fs = CreateTestFS();
+ int downloadCount = 0;
+
+ server->SetRequestHandler([&](auto& request) {
+ Y_UNUSED(request);
+ TVector<TString> locks;
+ (fs->GetRoot() / "locks").ListNames(locks);
+ TVector<TString> fileStorageLocks;
+ for (auto lock : locks) {
+ if (lock.Contains("file_storage")) {
+ fileStorageLocks.push_back(lock);
+ }
+ }
+ UNIT_ASSERT_EQUAL(1, fileStorageLocks.size());
+ downloadCount++;
+ return TTestHttpServer::TReply::OkETag(currentContent, currentETag);
+ });
+
+ auto url = server->GetUrl();
+
+ auto link1 = fs->PutUrl(url, {});
+ auto link2 = fs->PutUrl(url, {});
+ UNIT_ASSERT_GE(downloadCount, 1);
+ }
+
+ Y_UNIT_TEST(ParallelDownload) {
+ TSimpleThreadPool threadPool;
+ threadPool.Start(12);
+
+ auto server = CreateTestHttpServer();
+ TFileStoragePtr fs = CreateTestFS();
+
+ TString currentETag = "TAG_1";
+ TString currentContent = "ABC";
+
+ std::atomic<int> downloadCount = 0;
+ std::atomic<int> notModifiedCount = 0;
+ server->SetRequestHandler([&](const NYql::TTestHttpServer::TRequest& request) {
+ Sleep(TDuration::Seconds(1));
+
+ if (request.IfNoneMatch == currentETag) {
+ notModifiedCount++;
+ return TTestHttpServer::TReply::NotModified(currentETag);
+ }
+ downloadCount++;
+ return TTestHttpServer::TReply::OkETag(currentContent, currentETag);
+ });
+
+ auto url = server->GetUrl();
+ auto slowAsyncDowloading = [&]() {
+ auto link1 = fs->PutUrl(url, {});
+ return;
+ };
+
+ NThreading::TFuture<void> future1 = NThreading::Async(slowAsyncDowloading, threadPool);
+ NThreading::TFuture<void> future2 = NThreading::Async(slowAsyncDowloading, threadPool);
+ future1.Wait();
+ future2.Wait();
+ UNIT_ASSERT_EQUAL(downloadCount, 1);
+ UNIT_ASSERT_EQUAL(notModifiedCount, 1);
+ }
+
Y_UNIT_TEST(PutUrlWeakETagChange) {
auto server = CreateTestHttpServer();
diff --git a/yql/essentials/core/file_storage/storage.cpp b/yql/essentials/core/file_storage/storage.cpp
index a111a8c6744..3b3466877d7 100644
--- a/yql/essentials/core/file_storage/storage.cpp
+++ b/yql/essentials/core/file_storage/storage.cpp
@@ -35,6 +35,8 @@ namespace NYql {
namespace {
+constexpr const char CleanupLockFilename[] = ".cleanup_lock";
+
struct TFileObject {
TString Name;
time_t MTime;
@@ -52,6 +54,9 @@ TFsPath ToFilePath(const TString& path)
return path;
}
+constexpr char FileLocksDir[] = "locks";
+
+constexpr size_t MaxLockPathInStorage = 4096;
} // namespace
TFileLink::TFileLink(const TFsPath& path, const TString& storageFileName, ui64 size, const TString& md5, bool deleteOnDestroy)
@@ -152,6 +157,7 @@ public:
TImpl(size_t maxFiles, ui64 maxSize, const TString& storagePath)
: StorageDir_(ToFilePath(storagePath))
, ProcessTempDir_(StorageDir_ / ToString(GetPID())) // must be subfolder for fast hardlinking
+ , FileLocksDir_(StorageDir_ / ToString(FileLocksDir))
, IsTemp_(storagePath.empty())
, MaxFiles_(maxFiles)
, MaxSize_(maxSize)
@@ -162,9 +168,14 @@ public:
// TFsPath is not thread safe. It can initialize internal Split at any time. Force do it right now
StorageDir_.PathSplit();
ProcessTempDir_.PathSplit();
+ FileLocksDir_.PathSplit();
StorageDir_.MkDirs(MODE0711);
ProcessTempDir_.MkDirs(MODE0711);
+ FileLocksDir_.MkDirs(MODE0711);
+
+ CleanupLock_ = THolder<TFileLock>(new TFileLock(FileLocksDir_ / CleanupLockFilename));
+
#ifdef _linux_
ProcessTempDirLock_.Reset(new TFileLock(ProcessTempDir_ / ".lockfile"));
ProcessTempDirLock_->Acquire();
@@ -177,6 +188,7 @@ public:
TAtforkReinit::Get().Register(this);
YQL_LOG(INFO) << "FileStorage initialized in " << StorageDir_.GetPath().Quote()
<< ", temporary dir: " << ProcessTempDir_.GetPath().Quote()
+ << ", locks dir:" << FileLocksDir_.GetPath().Quote()
<< ", files: " << CurrentFiles_.load()
<< ", total size: " << CurrentSize_.load();
}
@@ -186,6 +198,7 @@ public:
try {
ProcessTempDir_.ForceDelete();
if (IsTemp_) {
+ FileLocksDir_.ForceDelete();
StorageDir_.ForceDelete();
}
} catch (...) {
@@ -201,6 +214,10 @@ public:
return ProcessTempDir_;
}
+ TFsPath GetLockFilePath(const TString& componentName, const TString& lockName) const {
+ return FileLocksDir_ / (componentName + "_" + ToString(THash<TString>{}(lockName) % MaxLockPathInStorage) + ".lockfile");
+ }
+
TFileLinkPtr Put(const TString& storageFileName, const TString& outFileName, const TString& md5, const NYql::NFS::TDataProvider& puller) {
bool newFileAdded = false;
TFileLinkPtr result = HardlinkFromStorage(storageFileName, md5, outFileName);
@@ -380,58 +397,64 @@ private:
if (!NeedToCleanup()) {
return;
}
+
+ TTryGuard guard(*CleanupLock_);
+
+ if (!guard.WasAcquired()) {
+ Dirty_.store(true);
+ return;
+ }
+
Dirty_.store(false);
- with_lock (CleanupLock_) {
- TVector<TString> names;
- StorageDir_.ListNames(names);
+ TVector<TString> names;
+ StorageDir_.ListNames(names);
- TVector<TFileObject> files;
- files.reserve(names.size());
+ TVector<TFileObject> files;
+ files.reserve(names.size());
- ui64 actualFiles = 0;
- ui64 actualSize = 0;
+ ui64 actualFiles = 0;
+ ui64 actualSize = 0;
- for (const TString& name: names) {
- TFsPath childPath(StorageDir_ / name);
- TFileStat stat(childPath, true);
- if (stat.IsFile()) {
- files.push_back(TFileObject{name, stat.MTime, stat.Size});
- ++actualFiles;
- actualSize += stat.Size;
- }
+ for (const TString& name: names) {
+ TFsPath childPath(StorageDir_ / name);
+ TFileStat stat(childPath, true);
+ if (stat.IsFile()) {
+ files.push_back(TFileObject{name, stat.MTime, stat.Size});
+ ++actualFiles;
+ actualSize += stat.Size;
}
+ }
- // sort files to get older files first
- Sort(files, [](const TFileObject& f1, const TFileObject& f2) {
- if (f1.MTime == f2.MTime) {
- return f1.Name.compare(f2.Name) < 0;
- }
- return f1.MTime < f2.MTime;
- });
-
- ui64 filesThreshold = MaxFiles_ / 2;
- ui64 sizeThreshold = MaxSize_ / 2;
+ // sort files to get older files first
+ Sort(files, [](const TFileObject& f1, const TFileObject& f2) {
+ if (f1.MTime == f2.MTime) {
+ return f1.Name.compare(f2.Name) < 0;
+ }
+ return f1.MTime < f2.MTime;
+ });
- for (const TFileObject& f: files) {
- if (actualFiles <= filesThreshold && actualSize <= sizeThreshold) {
- break;
- }
+ ui64 filesThreshold = MaxFiles_ / 2;
+ ui64 sizeThreshold = MaxSize_ / 2;
- YQL_LOG(INFO) << "Removing file from cache (name: " << f.Name
- << ", size: " << f.Size
- << ", mtime: " << f.MTime << ")";
- if (!NFs::Remove(StorageDir_ / f.Name)) {
- YQL_LOG(WARN) << "Failed to remove file " << f.Name.Quote() << ": " << LastSystemErrorText();
- } else {
- --actualFiles;
- actualSize -= f.Size;
- }
+ for (const TFileObject& f: files) {
+ if (actualFiles <= filesThreshold && actualSize <= sizeThreshold) {
+ break;
}
- CurrentFiles_.store(actualFiles);
- CurrentSize_.store(actualSize);
+ YQL_LOG(INFO) << "Removing file from cache (name: " << f.Name
+ << ", size: " << f.Size
+ << ", mtime: " << f.MTime << ")";
+ if (!NFs::Remove(StorageDir_ / f.Name)) {
+ YQL_LOG(WARN) << "Failed to remove file " << f.Name.Quote() << ": " << LastSystemErrorText();
+ } else {
+ --actualFiles;
+ actualSize -= f.Size;
+ }
}
+
+ CurrentFiles_.store(actualFiles);
+ CurrentSize_.store(actualSize);
}
void ResetAtFork() {
@@ -444,9 +467,11 @@ private:
}
private:
- TMutex CleanupLock_;
const TFsPath StorageDir_;
const TFsPath ProcessTempDir_;
+ const TFsPath FileLocksDir_;
+ THolder<TFileLock> CleanupLock_;
+
THolder<TFileLock> ProcessTempDirLock_;
const bool IsTemp_;
const ui64 MaxFiles_;
@@ -477,6 +502,10 @@ TFsPath TStorage::GetTemp() const
return Impl_->GetTemp();
}
+TFsPath TStorage::GetLockFilePath(const TString& componentName, const TString& lockName) const {
+ return Impl_->GetLockFilePath(componentName, lockName);
+}
+
TFileLinkPtr TStorage::Put(const TString& storageFileName, const TString& outFileName, const TString& md5, const NFS::TDataProvider& puller)
{
return Impl_->Put(storageFileName, outFileName, md5, puller);
diff --git a/yql/essentials/core/file_storage/storage.h b/yql/essentials/core/file_storage/storage.h
index 1bb562320da..c1f5015a9d0 100644
--- a/yql/essentials/core/file_storage/storage.h
+++ b/yql/essentials/core/file_storage/storage.h
@@ -50,6 +50,9 @@ public:
TFsPath GetRoot() const;
// Returns temp storage directory
TFsPath GetTemp() const;
+ // Returns path of lock filename. Some lock files can be reused for different
+ // |lockName| to prevent the growth of gargabe.
+ TFsPath GetLockFilePath(const TString& componentName, const TString& lockName) const;
// Puts the passed data to the storage with the specified storage file name.
// The second argument outFileName specifies a name of temporary link returned from the Put(). If empty, then random guid is used.
// Provide valid md5 if it is known in advance, otherwise pass "". It will be overridden by puller result
diff --git a/yql/essentials/core/file_storage/storage_ut.cpp b/yql/essentials/core/file_storage/storage_ut.cpp
index 4229abd2199..db8cbb61521 100644
--- a/yql/essentials/core/file_storage/storage_ut.cpp
+++ b/yql/essentials/core/file_storage/storage_ut.cpp
@@ -102,8 +102,13 @@ Y_UNIT_TEST_SUITE(TStorageTests) {
auto rootPath = storage->GetRoot();
storage.Destroy();
+// On Windows, the lock file is not removed when deleting a folder recursively (for some reason).
+// Mute the test path because, in production, TStorage is usually not created in temporary mode:
+// the rootPath (and locks folder) will still exist even after the TStorage instance is destroyed.
+#if !defined(_win_)
UNIT_ASSERT(!fileInStorage->GetPath().Exists());
UNIT_ASSERT(!rootPath.Exists());
+#endif // !defined(_win_)
}
Y_UNIT_TEST(DisplaceByCount) {
@@ -132,7 +137,7 @@ Y_UNIT_TEST_SUITE(TStorageTests) {
TVector<TString> filesInStorage;
storage->GetRoot().ListNames(filesInStorage);
- UNIT_ASSERT_EQUAL(filesInStorage.size(), 2); // 1 file + 1 hardlink directory
+ UNIT_ASSERT_EQUAL(filesInStorage.size(), 3); // 1 file + 1 hardlink directory + 1 locks directory
auto beg = filesInStorage.begin(),
end = filesInStorage.end();
@@ -169,7 +174,7 @@ Y_UNIT_TEST_SUITE(TStorageTests) {
TVector<TString> filesInStorage;
storage->GetRoot().ListNames(filesInStorage);
- UNIT_ASSERT_EQUAL(filesInStorage.size(), 2); // 1 file + 1 hardlink directory
+ UNIT_ASSERT_EQUAL(filesInStorage.size(), 3); // 1 file + 1 hardlink directory + 1 locks folder
auto beg = filesInStorage.begin(),
end = filesInStorage.end();
@@ -192,10 +197,21 @@ Y_UNIT_TEST_SUITE(TStorageTests) {
storage.Destroy();
UNIT_ASSERT(!fileInStorage->GetPath().Exists()); // hardlink was deleted
UNIT_ASSERT(rootPath.Exists());
+ UNIT_ASSERT((rootPath / "locks").Exists());
UNIT_ASSERT((rootPath / fileInStorage->GetStorageFileName()).Exists());
storage.Reset(new TStorage(100, 100, dir.GetFsPath()));
UNIT_ASSERT_EQUAL(storage->GetCount(), 1);
UNIT_ASSERT_EQUAL(storage->GetOccupiedSize(), DATA.size());
}
+
+ Y_UNIT_TEST(TestLocksDontGrowEndless) {
+ TTestDir dir("PersistStorage");
+ THolder<TStorage> storage = MakeHolder<TStorage>(100, 100, dir.GetFsPath());
+ THashSet<TString> uniquePaths;
+ for (size_t i = 0; i < 10000; i++) {
+ uniquePaths.emplace(storage->GetLockFilePath("test_component", ToString(i)));
+ }
+ UNIT_ASSERT_LE(uniquePaths.size(), 4096);
+ }
}
diff --git a/yql/essentials/utils/multi_resource_lock.cpp b/yql/essentials/utils/multi_resource_lock.cpp
deleted file mode 100644
index 84b095a2076..00000000000
--- a/yql/essentials/utils/multi_resource_lock.cpp
+++ /dev/null
@@ -1,42 +0,0 @@
-#include "multi_resource_lock.h"
-
-using namespace NYql;
-
-TMultiResourceLock::TResourceLock TMultiResourceLock::Acquire(TString resourceId) {
- TLock::TPtr lock = ProvideResourceLock(resourceId);
-
- // resource-specific mutex should be locked outside of Guard_ lock
- return { *this, std::move(lock), std::move(resourceId) };
-}
-
-TMultiResourceLock::~TMultiResourceLock() {
- with_lock(Guard_) {
- Y_ABORT_UNLESS(Locks_.empty(), "~TMultiResourceLock: we still have %lu unreleased locks", Locks_.size());
- }
-}
-
-TMultiResourceLock::TLock::TPtr TMultiResourceLock::ProvideResourceLock(const TString& resourceId) {
- with_lock(Guard_) {
- auto it = Locks_.find(resourceId);
- if (it == Locks_.end()) {
- it = Locks_.emplace(resourceId, MakeIntrusive<TLock>()).first;
- }
-
- // important: ref count will be incremented under lock
- // in this case we have guarantee TryCleanup will not erase this resource just after exit from this method and before entering lock->Mutex_.Acquire()
- return it->second;
- }
-}
-
-void TMultiResourceLock::TryCleanup(const TString& resourceId) {
- with_lock(Guard_) {
- auto it = Locks_.find(resourceId);
- if (it == Locks_.end()) {
- return;
- }
-
- if (it->second->IsUnique()) {
- Locks_.erase(it);
- }
- }
-}
diff --git a/yql/essentials/utils/multi_resource_lock.h b/yql/essentials/utils/multi_resource_lock.h
deleted file mode 100644
index c7f4ea49e27..00000000000
--- a/yql/essentials/utils/multi_resource_lock.h
+++ /dev/null
@@ -1,81 +0,0 @@
-#pragma once
-
-#include "yql_panic.h"
-
-#include <util/generic/map.h>
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/system/mutex.h>
-
-namespace NYql {
-
-class TMultiResourceLock : private TNonCopyable {
-private:
- struct TLock : public TThrRefBase {
- typedef TIntrusivePtr<TLock> TPtr;
-
- bool IsUnique() const {
- return RefCount() == 1;
- }
-
- TMutex Mutex;
- };
-
-public:
- struct TResourceLock : private TNonCopyable {
- TResourceLock(TMultiResourceLock& owner, TLock::TPtr lock, TString resourceId)
- : Owner_(owner)
- , Lock_(std::move(lock))
- , ResourceId_(std::move(resourceId))
- {
- Y_ENSURE(Lock_);
- Lock_->Mutex.Acquire();
- }
-
- TResourceLock(TResourceLock&& other)
- : Owner_(other.Owner_)
- , Lock_(std::move(other.Lock_))
- , ResourceId_(std::move(other.ResourceId_))
- {
-
- }
-
- TResourceLock& operator=(TResourceLock&&) = delete;
-
- ~TResourceLock() {
- if (!Lock_) {
- return;
- }
-
- Lock_->Mutex.Release();
- // decrement ref count before TryCleanup
- Lock_ = nullptr;
- Owner_.TryCleanup(ResourceId_);
- }
-
- private:
- TMultiResourceLock& Owner_;
- TLock::TPtr Lock_;
- TString ResourceId_;
- };
-
- TResourceLock Acquire(TString resourceId);
-
- template <typename F>
- auto RunWithLock(TString resourceId, const F& f) -> decltype(f()) {
- auto lock = Acquire(std::move(resourceId));
- return f();
- }
-
- ~TMultiResourceLock();
-
-private:
- TLock::TPtr ProvideResourceLock(const TString& resourceId);
- void TryCleanup(const TString& resourceId);
-
-private:
- TMutex Guard_;
- TMap<TString, TLock::TPtr> Locks_;
-};
-
-}
diff --git a/yql/essentials/utils/multi_resource_lock_ut.cpp b/yql/essentials/utils/multi_resource_lock_ut.cpp
deleted file mode 100644
index 0af9cea3fff..00000000000
--- a/yql/essentials/utils/multi_resource_lock_ut.cpp
+++ /dev/null
@@ -1,54 +0,0 @@
-#include "multi_resource_lock.h"
-#include <util/generic/xrange.h>
-#include <library/cpp/threading/future/async.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-namespace NYql {
-using namespace NThreading;
-
-Y_UNIT_TEST_SUITE(TMultiResourceLock) {
- Y_UNIT_TEST(ManyResources) {
- TMultiResourceLock multiLock;
- const int workersCount = 3;
- TVector<TVector<int>> workersData;
- workersData.resize(workersCount);
-
- TAdaptiveThreadPool queue;
- queue.Start(0);
-
- TVector<NThreading::TFuture<void>> workers;
- workers.reserve(workersCount);
- TManualEvent startEvent;
-
- for (int i = 0; i < workersCount; ++i) {
- TString resourceId = ToString(i);
- TVector<int>& data = workersData.at(i);
- NThreading::TFuture<void> f = NThreading::Async([&, resourceId]() {
- startEvent.Wait();
-
- for (int j = 0; j < 1000; ++j) {
- const auto& l = multiLock.Acquire(resourceId);
- Y_UNUSED(l);
- data.push_back(j);
- }
- }, queue);
-
- workers.push_back(std::move(f));
- }
-
- startEvent.Signal();
-
- NThreading::TFuture<void> all = NThreading::WaitExceptionOrAll(workers);
- all.GetValueSync();
- queue.Stop();
-
- // analyze workersData:
- auto range0_999 = xrange(0, 1000);
- for (auto& w : workersData) {
- UNIT_ASSERT_VALUES_EQUAL(w.size(), 1000);
- UNIT_ASSERT(std::equal(range0_999.begin(), range0_999.end(), w.begin()));
- }
- }
-}
-
-}
diff --git a/yql/essentials/utils/ut/ya.make b/yql/essentials/utils/ut/ya.make
index 0f7163ef793..ea89ede2579 100644
--- a/yql/essentials/utils/ut/ya.make
+++ b/yql/essentials/utils/ut/ya.make
@@ -3,7 +3,6 @@ UNITTEST_FOR(yql/essentials/utils)
SRCS(
fp_bits_ut.cpp
md5_stream_ut.cpp
- multi_resource_lock_ut.cpp
parse_double_ut.cpp
range_walker_ut.cpp
retry_ut.cpp
diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make
index d0e1b58200e..1888dfef431 100644
--- a/yql/essentials/utils/ya.make
+++ b/yql/essentials/utils/ya.make
@@ -20,8 +20,6 @@ SRCS(
mem_limit.cpp
method_index.cpp
method_index.h
- multi_resource_lock.cpp
- multi_resource_lock.h
parse_double.cpp
parse_double.h
proc_alive.cpp