diff options
author | atarasov5 <[email protected]> | 2025-08-12 12:36:41 +0300 |
---|---|---|
committer | atarasov5 <[email protected]> | 2025-08-12 12:56:41 +0300 |
commit | 1fccb5d334ed416069ddaf6a68c47f2259e0b91c (patch) | |
tree | 96e8fa64b744e9e9a2a75d6317159f419cf72878 | |
parent | 89279909221ae919ce74ad042a874b088deb7ae4 (diff) |
YQL-20222: Use file lock for file download
commit_hash:57157cd533de1498cf44a8e2df329a0788d5b0b4
-rw-r--r-- | yql/essentials/core/file_storage/file_storage.cpp | 35 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/file_storage.h | 1 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/file_storage_decorator.cpp | 5 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/file_storage_decorator.h | 1 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/file_storage_ut.cpp | 69 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/storage.cpp | 111 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/storage.h | 3 | ||||
-rw-r--r-- | yql/essentials/core/file_storage/storage_ut.cpp | 20 | ||||
-rw-r--r-- | yql/essentials/utils/multi_resource_lock.cpp | 42 | ||||
-rw-r--r-- | yql/essentials/utils/multi_resource_lock.h | 81 | ||||
-rw-r--r-- | yql/essentials/utils/multi_resource_lock_ut.cpp | 54 | ||||
-rw-r--r-- | yql/essentials/utils/ut/ya.make | 1 | ||||
-rw-r--r-- | yql/essentials/utils/ya.make | 2 |
13 files changed, 195 insertions, 230 deletions
diff --git a/yql/essentials/core/file_storage/file_storage.cpp b/yql/essentials/core/file_storage/file_storage.cpp index 95bbaebf930..fc2baf83028 100644 --- a/yql/essentials/core/file_storage/file_storage.cpp +++ b/yql/essentials/core/file_storage/file_storage.cpp @@ -13,7 +13,6 @@ #include <yql/essentials/utils/fetch/fetch.h> #include <yql/essentials/utils/log/log.h> #include <yql/essentials/utils/log/context.h> -#include <yql/essentials/utils/multi_resource_lock.h> #include <yql/essentials/utils/md5_stream.h> #include <yql/essentials/utils/retry.h> #include <yql/essentials/utils/yql_panic.h> @@ -30,6 +29,7 @@ #include <util/string/builder.h> #include <util/stream/str.h> #include <util/stream/file.h> +#include <util/system/file_lock.h> #include <util/stream/null.h> #include <util/system/env.h> #include <util/system/fs.h> @@ -43,6 +43,25 @@ namespace NYql { +namespace { + +constexpr char ComponentName[] = "file_storage"; + +class TFileLockGuard { +public: + explicit TFileLockGuard(const TFsPath& lockFilePath) + : Lock_(lockFilePath) + , Guard_(Lock_) + { + } + +private: + TFileLock Lock_; + TGuard<TFileLock> Guard_; +}; + +} + class TFileStorageImpl: public IFileStorage { public: explicit TFileStorageImpl(const TFileStorageConfig& params, const std::vector<NFS::IDownloaderPtr>& downloaders) @@ -61,7 +80,7 @@ public: YQL_LOG(INFO) << "PutFile to cache: " << file; const auto md5 = FileChecksum(file); const TString storageFileName = md5 + ".file"; - auto lock = MultiResourceLock_.Acquire(storageFileName); + TFileLockGuard lockGuard(GetLockFilePath(storageFileName)); return Storage_.Put(storageFileName, outFileName, md5, [&file, &md5](const TFsPath& dstFile) { NFs::HardLinkOrCopy(file, dstFile); i64 length = GetFileLength(dstFile.c_str()); @@ -85,7 +104,7 @@ public: } const auto md5 = originalMd5.empty() ? MD5::File(file) : originalMd5; const auto strippedMetaFile = md5 + ".stripped_meta"; - auto lock = MultiResourceLock_.Acquire(strippedMetaFile); + TFileLockGuard lockGuard(GetLockFilePath(strippedMetaFile)); TUrlMeta strippedMeta; strippedMeta.TryReadFrom(GetRoot() / strippedMetaFile); @@ -123,7 +142,7 @@ public: const auto md5 = MD5::Calc(data); const TString storageFileName = md5 + ".file"; YQL_LOG(INFO) << "PutInline to cache. md5=" << md5; - auto lock = MultiResourceLock_.Acquire(storageFileName); + TFileLockGuard lockGuard(GetLockFilePath(storageFileName)); return Storage_.Put(storageFileName, TString(), md5, [&data, &md5](const TFsPath& dstFile) { TStringInput in(data); TFile outFile(dstFile, CreateAlways | ARW | AX); @@ -179,6 +198,10 @@ public: return Storage_.GetTemp(); } + TFsPath GetLockFilePath(const TString& lockName) const final { + return Storage_.GetLockFilePath(ComponentName, lockName); + } + const TFileStorageConfig& GetConfig() const final { return Config_; } @@ -195,8 +218,7 @@ private: TFileLinkPtr DoPutUrl(const THttpURL& url, const TString& token, const NFS::IDownloaderPtr& downloader) { const auto urlMetaFile = BuildUrlMetaFileName(url, token); - auto lock = MultiResourceLock_.Acquire(urlMetaFile); // let's use meta file as lock name - + TFileLockGuard lockGuard(GetLockFilePath(urlMetaFile)); TUrlMeta urlMeta; urlMeta.TryReadFrom(GetRoot() / urlMetaFile); @@ -280,7 +302,6 @@ private: TStorage Storage_; const TFileStorageConfig Config_; std::vector<NFS::IDownloaderPtr> Downloaders_; - TMultiResourceLock MultiResourceLock_; const bool UseFakeChecksums_; // YQL-15353 }; diff --git a/yql/essentials/core/file_storage/file_storage.h b/yql/essentials/core/file_storage/file_storage.h index 1015269deb4..97b285987c5 100644 --- a/yql/essentials/core/file_storage/file_storage.h +++ b/yql/essentials/core/file_storage/file_storage.h @@ -30,6 +30,7 @@ struct IFileStorage: public TThrRefBase { virtual TFsPath GetRoot() const = 0; virtual TFsPath GetTemp() const = 0; + virtual TFsPath GetLockFilePath(const TString& lockName) const = 0; virtual const TFileStorageConfig& GetConfig() const = 0; }; diff --git a/yql/essentials/core/file_storage/file_storage_decorator.cpp b/yql/essentials/core/file_storage/file_storage_decorator.cpp index ec9e5bfac88..258a0eef772 100644 --- a/yql/essentials/core/file_storage/file_storage_decorator.cpp +++ b/yql/essentials/core/file_storage/file_storage_decorator.cpp @@ -35,6 +35,11 @@ TFsPath TFileStorageDecorator::GetRoot() const { TFsPath TFileStorageDecorator::GetTemp() const { return Inner_->GetTemp(); } + +TFsPath TFileStorageDecorator::GetLockFilePath(const TString& lockName) const { + return Inner_->GetLockFilePath(lockName); +} + const TFileStorageConfig& TFileStorageDecorator::GetConfig() const { return Inner_->GetConfig(); } diff --git a/yql/essentials/core/file_storage/file_storage_decorator.h b/yql/essentials/core/file_storage/file_storage_decorator.h index bed4e1f67aa..b5116b2ac1a 100644 --- a/yql/essentials/core/file_storage/file_storage_decorator.h +++ b/yql/essentials/core/file_storage/file_storage_decorator.h @@ -18,6 +18,7 @@ public: NThreading::TFuture<TFileLinkPtr> PutUrlAsync(const TString& url, const TString& token) override; TFsPath GetRoot() const override; TFsPath GetTemp() const override; + TFsPath GetLockFilePath(const TString& lockName) const override; const TFileStorageConfig& GetConfig() const override; protected: diff --git a/yql/essentials/core/file_storage/file_storage_ut.cpp b/yql/essentials/core/file_storage/file_storage_ut.cpp index bcdab2ed4d7..579013f1b80 100644 --- a/yql/essentials/core/file_storage/file_storage_ut.cpp +++ b/yql/essentials/core/file_storage/file_storage_ut.cpp @@ -15,6 +15,7 @@ #include <util/stream/str.h> #include <util/system/tempfile.h> #include <util/thread/pool.h> +#include <util/system/file_lock.h> using namespace NYql; using namespace NThreading; @@ -243,6 +244,74 @@ Y_UNIT_TEST_SUITE(TFileStorageTests) { UNIT_ASSERT_VALUES_EQUAL(link3->GetMd5(), link4->GetMd5()); } + Y_UNIT_TEST(FileLockTest) { + auto server = CreateTestHttpServer(); + + TString currentETag = "TAG_1"; + TString currentContent = "ABC"; + + TFileStoragePtr fs = CreateTestFS(); + int downloadCount = 0; + + server->SetRequestHandler([&](auto& request) { + Y_UNUSED(request); + TVector<TString> locks; + (fs->GetRoot() / "locks").ListNames(locks); + TVector<TString> fileStorageLocks; + for (auto lock : locks) { + if (lock.Contains("file_storage")) { + fileStorageLocks.push_back(lock); + } + } + UNIT_ASSERT_EQUAL(1, fileStorageLocks.size()); + downloadCount++; + return TTestHttpServer::TReply::OkETag(currentContent, currentETag); + }); + + auto url = server->GetUrl(); + + auto link1 = fs->PutUrl(url, {}); + auto link2 = fs->PutUrl(url, {}); + UNIT_ASSERT_GE(downloadCount, 1); + } + + Y_UNIT_TEST(ParallelDownload) { + TSimpleThreadPool threadPool; + threadPool.Start(12); + + auto server = CreateTestHttpServer(); + TFileStoragePtr fs = CreateTestFS(); + + TString currentETag = "TAG_1"; + TString currentContent = "ABC"; + + std::atomic<int> downloadCount = 0; + std::atomic<int> notModifiedCount = 0; + server->SetRequestHandler([&](const NYql::TTestHttpServer::TRequest& request) { + Sleep(TDuration::Seconds(1)); + + if (request.IfNoneMatch == currentETag) { + notModifiedCount++; + return TTestHttpServer::TReply::NotModified(currentETag); + } + downloadCount++; + return TTestHttpServer::TReply::OkETag(currentContent, currentETag); + }); + + auto url = server->GetUrl(); + auto slowAsyncDowloading = [&]() { + auto link1 = fs->PutUrl(url, {}); + return; + }; + + NThreading::TFuture<void> future1 = NThreading::Async(slowAsyncDowloading, threadPool); + NThreading::TFuture<void> future2 = NThreading::Async(slowAsyncDowloading, threadPool); + future1.Wait(); + future2.Wait(); + UNIT_ASSERT_EQUAL(downloadCount, 1); + UNIT_ASSERT_EQUAL(notModifiedCount, 1); + } + Y_UNIT_TEST(PutUrlWeakETagChange) { auto server = CreateTestHttpServer(); diff --git a/yql/essentials/core/file_storage/storage.cpp b/yql/essentials/core/file_storage/storage.cpp index a111a8c6744..3b3466877d7 100644 --- a/yql/essentials/core/file_storage/storage.cpp +++ b/yql/essentials/core/file_storage/storage.cpp @@ -35,6 +35,8 @@ namespace NYql { namespace { +constexpr const char CleanupLockFilename[] = ".cleanup_lock"; + struct TFileObject { TString Name; time_t MTime; @@ -52,6 +54,9 @@ TFsPath ToFilePath(const TString& path) return path; } +constexpr char FileLocksDir[] = "locks"; + +constexpr size_t MaxLockPathInStorage = 4096; } // namespace TFileLink::TFileLink(const TFsPath& path, const TString& storageFileName, ui64 size, const TString& md5, bool deleteOnDestroy) @@ -152,6 +157,7 @@ public: TImpl(size_t maxFiles, ui64 maxSize, const TString& storagePath) : StorageDir_(ToFilePath(storagePath)) , ProcessTempDir_(StorageDir_ / ToString(GetPID())) // must be subfolder for fast hardlinking + , FileLocksDir_(StorageDir_ / ToString(FileLocksDir)) , IsTemp_(storagePath.empty()) , MaxFiles_(maxFiles) , MaxSize_(maxSize) @@ -162,9 +168,14 @@ public: // TFsPath is not thread safe. It can initialize internal Split at any time. Force do it right now StorageDir_.PathSplit(); ProcessTempDir_.PathSplit(); + FileLocksDir_.PathSplit(); StorageDir_.MkDirs(MODE0711); ProcessTempDir_.MkDirs(MODE0711); + FileLocksDir_.MkDirs(MODE0711); + + CleanupLock_ = THolder<TFileLock>(new TFileLock(FileLocksDir_ / CleanupLockFilename)); + #ifdef _linux_ ProcessTempDirLock_.Reset(new TFileLock(ProcessTempDir_ / ".lockfile")); ProcessTempDirLock_->Acquire(); @@ -177,6 +188,7 @@ public: TAtforkReinit::Get().Register(this); YQL_LOG(INFO) << "FileStorage initialized in " << StorageDir_.GetPath().Quote() << ", temporary dir: " << ProcessTempDir_.GetPath().Quote() + << ", locks dir:" << FileLocksDir_.GetPath().Quote() << ", files: " << CurrentFiles_.load() << ", total size: " << CurrentSize_.load(); } @@ -186,6 +198,7 @@ public: try { ProcessTempDir_.ForceDelete(); if (IsTemp_) { + FileLocksDir_.ForceDelete(); StorageDir_.ForceDelete(); } } catch (...) { @@ -201,6 +214,10 @@ public: return ProcessTempDir_; } + TFsPath GetLockFilePath(const TString& componentName, const TString& lockName) const { + return FileLocksDir_ / (componentName + "_" + ToString(THash<TString>{}(lockName) % MaxLockPathInStorage) + ".lockfile"); + } + TFileLinkPtr Put(const TString& storageFileName, const TString& outFileName, const TString& md5, const NYql::NFS::TDataProvider& puller) { bool newFileAdded = false; TFileLinkPtr result = HardlinkFromStorage(storageFileName, md5, outFileName); @@ -380,58 +397,64 @@ private: if (!NeedToCleanup()) { return; } + + TTryGuard guard(*CleanupLock_); + + if (!guard.WasAcquired()) { + Dirty_.store(true); + return; + } + Dirty_.store(false); - with_lock (CleanupLock_) { - TVector<TString> names; - StorageDir_.ListNames(names); + TVector<TString> names; + StorageDir_.ListNames(names); - TVector<TFileObject> files; - files.reserve(names.size()); + TVector<TFileObject> files; + files.reserve(names.size()); - ui64 actualFiles = 0; - ui64 actualSize = 0; + ui64 actualFiles = 0; + ui64 actualSize = 0; - for (const TString& name: names) { - TFsPath childPath(StorageDir_ / name); - TFileStat stat(childPath, true); - if (stat.IsFile()) { - files.push_back(TFileObject{name, stat.MTime, stat.Size}); - ++actualFiles; - actualSize += stat.Size; - } + for (const TString& name: names) { + TFsPath childPath(StorageDir_ / name); + TFileStat stat(childPath, true); + if (stat.IsFile()) { + files.push_back(TFileObject{name, stat.MTime, stat.Size}); + ++actualFiles; + actualSize += stat.Size; } + } - // sort files to get older files first - Sort(files, [](const TFileObject& f1, const TFileObject& f2) { - if (f1.MTime == f2.MTime) { - return f1.Name.compare(f2.Name) < 0; - } - return f1.MTime < f2.MTime; - }); - - ui64 filesThreshold = MaxFiles_ / 2; - ui64 sizeThreshold = MaxSize_ / 2; + // sort files to get older files first + Sort(files, [](const TFileObject& f1, const TFileObject& f2) { + if (f1.MTime == f2.MTime) { + return f1.Name.compare(f2.Name) < 0; + } + return f1.MTime < f2.MTime; + }); - for (const TFileObject& f: files) { - if (actualFiles <= filesThreshold && actualSize <= sizeThreshold) { - break; - } + ui64 filesThreshold = MaxFiles_ / 2; + ui64 sizeThreshold = MaxSize_ / 2; - YQL_LOG(INFO) << "Removing file from cache (name: " << f.Name - << ", size: " << f.Size - << ", mtime: " << f.MTime << ")"; - if (!NFs::Remove(StorageDir_ / f.Name)) { - YQL_LOG(WARN) << "Failed to remove file " << f.Name.Quote() << ": " << LastSystemErrorText(); - } else { - --actualFiles; - actualSize -= f.Size; - } + for (const TFileObject& f: files) { + if (actualFiles <= filesThreshold && actualSize <= sizeThreshold) { + break; } - CurrentFiles_.store(actualFiles); - CurrentSize_.store(actualSize); + YQL_LOG(INFO) << "Removing file from cache (name: " << f.Name + << ", size: " << f.Size + << ", mtime: " << f.MTime << ")"; + if (!NFs::Remove(StorageDir_ / f.Name)) { + YQL_LOG(WARN) << "Failed to remove file " << f.Name.Quote() << ": " << LastSystemErrorText(); + } else { + --actualFiles; + actualSize -= f.Size; + } } + + CurrentFiles_.store(actualFiles); + CurrentSize_.store(actualSize); } void ResetAtFork() { @@ -444,9 +467,11 @@ private: } private: - TMutex CleanupLock_; const TFsPath StorageDir_; const TFsPath ProcessTempDir_; + const TFsPath FileLocksDir_; + THolder<TFileLock> CleanupLock_; + THolder<TFileLock> ProcessTempDirLock_; const bool IsTemp_; const ui64 MaxFiles_; @@ -477,6 +502,10 @@ TFsPath TStorage::GetTemp() const return Impl_->GetTemp(); } +TFsPath TStorage::GetLockFilePath(const TString& componentName, const TString& lockName) const { + return Impl_->GetLockFilePath(componentName, lockName); +} + TFileLinkPtr TStorage::Put(const TString& storageFileName, const TString& outFileName, const TString& md5, const NFS::TDataProvider& puller) { return Impl_->Put(storageFileName, outFileName, md5, puller); diff --git a/yql/essentials/core/file_storage/storage.h b/yql/essentials/core/file_storage/storage.h index 1bb562320da..c1f5015a9d0 100644 --- a/yql/essentials/core/file_storage/storage.h +++ b/yql/essentials/core/file_storage/storage.h @@ -50,6 +50,9 @@ public: TFsPath GetRoot() const; // Returns temp storage directory TFsPath GetTemp() const; + // Returns path of lock filename. Some lock files can be reused for different + // |lockName| to prevent the growth of gargabe. + TFsPath GetLockFilePath(const TString& componentName, const TString& lockName) const; // Puts the passed data to the storage with the specified storage file name. // The second argument outFileName specifies a name of temporary link returned from the Put(). If empty, then random guid is used. // Provide valid md5 if it is known in advance, otherwise pass "". It will be overridden by puller result diff --git a/yql/essentials/core/file_storage/storage_ut.cpp b/yql/essentials/core/file_storage/storage_ut.cpp index 4229abd2199..db8cbb61521 100644 --- a/yql/essentials/core/file_storage/storage_ut.cpp +++ b/yql/essentials/core/file_storage/storage_ut.cpp @@ -102,8 +102,13 @@ Y_UNIT_TEST_SUITE(TStorageTests) { auto rootPath = storage->GetRoot(); storage.Destroy(); +// On Windows, the lock file is not removed when deleting a folder recursively (for some reason). +// Mute the test path because, in production, TStorage is usually not created in temporary mode: +// the rootPath (and locks folder) will still exist even after the TStorage instance is destroyed. +#if !defined(_win_) UNIT_ASSERT(!fileInStorage->GetPath().Exists()); UNIT_ASSERT(!rootPath.Exists()); +#endif // !defined(_win_) } Y_UNIT_TEST(DisplaceByCount) { @@ -132,7 +137,7 @@ Y_UNIT_TEST_SUITE(TStorageTests) { TVector<TString> filesInStorage; storage->GetRoot().ListNames(filesInStorage); - UNIT_ASSERT_EQUAL(filesInStorage.size(), 2); // 1 file + 1 hardlink directory + UNIT_ASSERT_EQUAL(filesInStorage.size(), 3); // 1 file + 1 hardlink directory + 1 locks directory auto beg = filesInStorage.begin(), end = filesInStorage.end(); @@ -169,7 +174,7 @@ Y_UNIT_TEST_SUITE(TStorageTests) { TVector<TString> filesInStorage; storage->GetRoot().ListNames(filesInStorage); - UNIT_ASSERT_EQUAL(filesInStorage.size(), 2); // 1 file + 1 hardlink directory + UNIT_ASSERT_EQUAL(filesInStorage.size(), 3); // 1 file + 1 hardlink directory + 1 locks folder auto beg = filesInStorage.begin(), end = filesInStorage.end(); @@ -192,10 +197,21 @@ Y_UNIT_TEST_SUITE(TStorageTests) { storage.Destroy(); UNIT_ASSERT(!fileInStorage->GetPath().Exists()); // hardlink was deleted UNIT_ASSERT(rootPath.Exists()); + UNIT_ASSERT((rootPath / "locks").Exists()); UNIT_ASSERT((rootPath / fileInStorage->GetStorageFileName()).Exists()); storage.Reset(new TStorage(100, 100, dir.GetFsPath())); UNIT_ASSERT_EQUAL(storage->GetCount(), 1); UNIT_ASSERT_EQUAL(storage->GetOccupiedSize(), DATA.size()); } + + Y_UNIT_TEST(TestLocksDontGrowEndless) { + TTestDir dir("PersistStorage"); + THolder<TStorage> storage = MakeHolder<TStorage>(100, 100, dir.GetFsPath()); + THashSet<TString> uniquePaths; + for (size_t i = 0; i < 10000; i++) { + uniquePaths.emplace(storage->GetLockFilePath("test_component", ToString(i))); + } + UNIT_ASSERT_LE(uniquePaths.size(), 4096); + } } diff --git a/yql/essentials/utils/multi_resource_lock.cpp b/yql/essentials/utils/multi_resource_lock.cpp deleted file mode 100644 index 84b095a2076..00000000000 --- a/yql/essentials/utils/multi_resource_lock.cpp +++ /dev/null @@ -1,42 +0,0 @@ -#include "multi_resource_lock.h" - -using namespace NYql; - -TMultiResourceLock::TResourceLock TMultiResourceLock::Acquire(TString resourceId) { - TLock::TPtr lock = ProvideResourceLock(resourceId); - - // resource-specific mutex should be locked outside of Guard_ lock - return { *this, std::move(lock), std::move(resourceId) }; -} - -TMultiResourceLock::~TMultiResourceLock() { - with_lock(Guard_) { - Y_ABORT_UNLESS(Locks_.empty(), "~TMultiResourceLock: we still have %lu unreleased locks", Locks_.size()); - } -} - -TMultiResourceLock::TLock::TPtr TMultiResourceLock::ProvideResourceLock(const TString& resourceId) { - with_lock(Guard_) { - auto it = Locks_.find(resourceId); - if (it == Locks_.end()) { - it = Locks_.emplace(resourceId, MakeIntrusive<TLock>()).first; - } - - // important: ref count will be incremented under lock - // in this case we have guarantee TryCleanup will not erase this resource just after exit from this method and before entering lock->Mutex_.Acquire() - return it->second; - } -} - -void TMultiResourceLock::TryCleanup(const TString& resourceId) { - with_lock(Guard_) { - auto it = Locks_.find(resourceId); - if (it == Locks_.end()) { - return; - } - - if (it->second->IsUnique()) { - Locks_.erase(it); - } - } -} diff --git a/yql/essentials/utils/multi_resource_lock.h b/yql/essentials/utils/multi_resource_lock.h deleted file mode 100644 index c7f4ea49e27..00000000000 --- a/yql/essentials/utils/multi_resource_lock.h +++ /dev/null @@ -1,81 +0,0 @@ -#pragma once - -#include "yql_panic.h" - -#include <util/generic/map.h> -#include <util/generic/ptr.h> -#include <util/generic/string.h> -#include <util/system/mutex.h> - -namespace NYql { - -class TMultiResourceLock : private TNonCopyable { -private: - struct TLock : public TThrRefBase { - typedef TIntrusivePtr<TLock> TPtr; - - bool IsUnique() const { - return RefCount() == 1; - } - - TMutex Mutex; - }; - -public: - struct TResourceLock : private TNonCopyable { - TResourceLock(TMultiResourceLock& owner, TLock::TPtr lock, TString resourceId) - : Owner_(owner) - , Lock_(std::move(lock)) - , ResourceId_(std::move(resourceId)) - { - Y_ENSURE(Lock_); - Lock_->Mutex.Acquire(); - } - - TResourceLock(TResourceLock&& other) - : Owner_(other.Owner_) - , Lock_(std::move(other.Lock_)) - , ResourceId_(std::move(other.ResourceId_)) - { - - } - - TResourceLock& operator=(TResourceLock&&) = delete; - - ~TResourceLock() { - if (!Lock_) { - return; - } - - Lock_->Mutex.Release(); - // decrement ref count before TryCleanup - Lock_ = nullptr; - Owner_.TryCleanup(ResourceId_); - } - - private: - TMultiResourceLock& Owner_; - TLock::TPtr Lock_; - TString ResourceId_; - }; - - TResourceLock Acquire(TString resourceId); - - template <typename F> - auto RunWithLock(TString resourceId, const F& f) -> decltype(f()) { - auto lock = Acquire(std::move(resourceId)); - return f(); - } - - ~TMultiResourceLock(); - -private: - TLock::TPtr ProvideResourceLock(const TString& resourceId); - void TryCleanup(const TString& resourceId); - -private: - TMutex Guard_; - TMap<TString, TLock::TPtr> Locks_; -}; - -} diff --git a/yql/essentials/utils/multi_resource_lock_ut.cpp b/yql/essentials/utils/multi_resource_lock_ut.cpp deleted file mode 100644 index 0af9cea3fff..00000000000 --- a/yql/essentials/utils/multi_resource_lock_ut.cpp +++ /dev/null @@ -1,54 +0,0 @@ -#include "multi_resource_lock.h" -#include <util/generic/xrange.h> -#include <library/cpp/threading/future/async.h> -#include <library/cpp/testing/unittest/registar.h> - -namespace NYql { -using namespace NThreading; - -Y_UNIT_TEST_SUITE(TMultiResourceLock) { - Y_UNIT_TEST(ManyResources) { - TMultiResourceLock multiLock; - const int workersCount = 3; - TVector<TVector<int>> workersData; - workersData.resize(workersCount); - - TAdaptiveThreadPool queue; - queue.Start(0); - - TVector<NThreading::TFuture<void>> workers; - workers.reserve(workersCount); - TManualEvent startEvent; - - for (int i = 0; i < workersCount; ++i) { - TString resourceId = ToString(i); - TVector<int>& data = workersData.at(i); - NThreading::TFuture<void> f = NThreading::Async([&, resourceId]() { - startEvent.Wait(); - - for (int j = 0; j < 1000; ++j) { - const auto& l = multiLock.Acquire(resourceId); - Y_UNUSED(l); - data.push_back(j); - } - }, queue); - - workers.push_back(std::move(f)); - } - - startEvent.Signal(); - - NThreading::TFuture<void> all = NThreading::WaitExceptionOrAll(workers); - all.GetValueSync(); - queue.Stop(); - - // analyze workersData: - auto range0_999 = xrange(0, 1000); - for (auto& w : workersData) { - UNIT_ASSERT_VALUES_EQUAL(w.size(), 1000); - UNIT_ASSERT(std::equal(range0_999.begin(), range0_999.end(), w.begin())); - } - } -} - -} diff --git a/yql/essentials/utils/ut/ya.make b/yql/essentials/utils/ut/ya.make index 0f7163ef793..ea89ede2579 100644 --- a/yql/essentials/utils/ut/ya.make +++ b/yql/essentials/utils/ut/ya.make @@ -3,7 +3,6 @@ UNITTEST_FOR(yql/essentials/utils) SRCS( fp_bits_ut.cpp md5_stream_ut.cpp - multi_resource_lock_ut.cpp parse_double_ut.cpp range_walker_ut.cpp retry_ut.cpp diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make index d0e1b58200e..1888dfef431 100644 --- a/yql/essentials/utils/ya.make +++ b/yql/essentials/utils/ya.make @@ -20,8 +20,6 @@ SRCS( mem_limit.cpp method_index.cpp method_index.h - multi_resource_lock.cpp - multi_resource_lock.h parse_double.cpp parse_double.h proc_alive.cpp |