diff options
author | Roman Udovichenko <rvu@ydb.tech> | 2024-01-09 17:40:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-09 17:40:08 +0300 |
commit | eee3fb563609cf04464365724d4badf54b84c771 (patch) | |
tree | d51d6fcbf4edb852f9ce524e3759909075257d49 | |
parent | 5232e812727841f020a7c7f4a44041520478069f (diff) | |
download | ydb-eee3fb563609cf04464365724d4badf54b84c771.tar.gz |
[yql] Refresh file storage state at fork (YQL-17461) (#878)
* [yql] Refresh file storage state at fork (YQL-17461)
* Add `const` to NeedToCleanup()
-rw-r--r-- | ydb/library/yql/core/file_storage/storage.cpp | 47 |
1 files changed, 28 insertions, 19 deletions
diff --git a/ydb/library/yql/core/file_storage/storage.cpp b/ydb/library/yql/core/file_storage/storage.cpp index 51c8e42236..647ab04636 100644 --- a/ydb/library/yql/core/file_storage/storage.cpp +++ b/ydb/library/yql/core/file_storage/storage.cpp @@ -21,6 +21,7 @@ #include <util/system/thread.h> #include <functional> +#include <atomic> #if defined(_unix_) #include <pthread.h> @@ -135,7 +136,7 @@ public: private: void Reinit() { for (auto& v : Registered) { - v.ResetRandom(); + v.ResetAtFork(); } } @@ -153,6 +154,9 @@ public: , IsTemp(storagePath.empty()) , MaxFiles(maxFiles) , MaxSize(maxSize) + , CurrentFiles(0) + , CurrentSize(0) + , Dirty(false) { // TFsPath is not thread safe. It can initialize internal Split at any time. Force do it right now StorageDir.PathSplit(); @@ -172,8 +176,8 @@ public: TAtforkReinit::Get().Register(this); YQL_LOG(INFO) << "FileStorage initialized in " << StorageDir.GetPath().Quote() << ", temporary dir: " << ProcessTempDir.GetPath().Quote() - << ", files: " << CurrentFiles - << ", total size: " << CurrentSize; + << ", files: " << CurrentFiles.load() + << ", total size: " << CurrentSize.load(); } ~TImpl() { @@ -219,8 +223,8 @@ public: SetCacheFilePermissionsNoThrow(hardlinkFile); if (NFs::HardLink(hardlinkFile, storageFile)) { - AtomicIncrement(CurrentFiles); - AtomicAdd(CurrentSize, fileSize); + ++CurrentFiles; + CurrentSize += fileSize; } // Ignore HardLink fail. Another process managed to download before us TouchFile(storageFile.c_str()); @@ -281,10 +285,10 @@ public: const i64 newFileSize = Max<i64>(0, GetFileLength(dstStorageFile.c_str())); if (!prevFileExisted) { - AtomicIncrement(CurrentFiles); + ++CurrentFiles; } - AtomicAdd(CurrentSize, newFileSize - prevFileSize); + CurrentSize += newFileSize - prevFileSize; } bool RemoveFromStorage(const TString& existingStorageFileName) { @@ -300,19 +304,19 @@ public: const bool result = NFs::Remove(storageFile); if (result || !storageFile.Exists()) { - AtomicDecrement(CurrentFiles); - AtomicAdd(CurrentSize, -prevFileSize); + ++CurrentFiles; + CurrentSize -= prevFileSize; } return result; } ui64 GetOccupiedSize() const { - return AtomicGet(CurrentSize); + return CurrentSize.load(); } size_t GetCount() const { - return AtomicGet(CurrentFiles); + return CurrentFiles.load(); } TString GetTempName() { @@ -365,15 +369,17 @@ private: CurrentSize = actualSize; } - bool NeedToCleanup() { - return static_cast<ui64>(AtomicGet(CurrentFiles)) > MaxFiles || - static_cast<ui64>(AtomicGet(CurrentSize)) > MaxSize; + bool NeedToCleanup() const { + return Dirty.load() + || static_cast<ui64>(CurrentFiles.load()) > MaxFiles + || static_cast<ui64>(CurrentSize.load()) > MaxSize; } void Cleanup() { if (!NeedToCleanup()) { return; } + Dirty.store(false); with_lock (CleanupLock) { TVector<TString> names; @@ -422,15 +428,17 @@ private: } } - AtomicSet(CurrentFiles, actualFiles); - AtomicSet(CurrentSize, actualSize); + CurrentFiles.store(actualFiles); + CurrentSize.store(actualSize); } } - void ResetRandom() { + void ResetAtFork() { with_lock(RndLock) { Rnd.ResetSeed(); } + // Force cleanup on next file add, because other processes may change the state + Dirty.store(true); } private: @@ -441,8 +449,9 @@ private: const bool IsTemp; const ui64 MaxFiles; const ui64 MaxSize; - TAtomic CurrentFiles = 0; - TAtomic CurrentSize = 0; + std::atomic<i64> CurrentFiles = 0; + std::atomic<i64> CurrentSize = 0; + std::atomic_bool Dirty; TMutex RndLock; TRandGuid Rnd; }; |