aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman Udovichenko <rvu@ydb.tech>2024-01-09 17:40:08 +0300
committerGitHub <noreply@github.com>2024-01-09 17:40:08 +0300
commiteee3fb563609cf04464365724d4badf54b84c771 (patch)
treed51d6fcbf4edb852f9ce524e3759909075257d49
parent5232e812727841f020a7c7f4a44041520478069f (diff)
downloadydb-eee3fb563609cf04464365724d4badf54b84c771.tar.gz
[yql] Refresh file storage state at fork (YQL-17461) (#878)
* [yql] Refresh file storage state at fork (YQL-17461) * Add `const` to NeedToCleanup()
-rw-r--r--ydb/library/yql/core/file_storage/storage.cpp47
1 files changed, 28 insertions, 19 deletions
diff --git a/ydb/library/yql/core/file_storage/storage.cpp b/ydb/library/yql/core/file_storage/storage.cpp
index 51c8e42236..647ab04636 100644
--- a/ydb/library/yql/core/file_storage/storage.cpp
+++ b/ydb/library/yql/core/file_storage/storage.cpp
@@ -21,6 +21,7 @@
#include <util/system/thread.h>
#include <functional>
+#include <atomic>
#if defined(_unix_)
#include <pthread.h>
@@ -135,7 +136,7 @@ public:
private:
void Reinit() {
for (auto& v : Registered) {
- v.ResetRandom();
+ v.ResetAtFork();
}
}
@@ -153,6 +154,9 @@ public:
, IsTemp(storagePath.empty())
, MaxFiles(maxFiles)
, MaxSize(maxSize)
+ , CurrentFiles(0)
+ , CurrentSize(0)
+ , Dirty(false)
{
// TFsPath is not thread safe. It can initialize internal Split at any time. Force do it right now
StorageDir.PathSplit();
@@ -172,8 +176,8 @@ public:
TAtforkReinit::Get().Register(this);
YQL_LOG(INFO) << "FileStorage initialized in " << StorageDir.GetPath().Quote()
<< ", temporary dir: " << ProcessTempDir.GetPath().Quote()
- << ", files: " << CurrentFiles
- << ", total size: " << CurrentSize;
+ << ", files: " << CurrentFiles.load()
+ << ", total size: " << CurrentSize.load();
}
~TImpl() {
@@ -219,8 +223,8 @@ public:
SetCacheFilePermissionsNoThrow(hardlinkFile);
if (NFs::HardLink(hardlinkFile, storageFile)) {
- AtomicIncrement(CurrentFiles);
- AtomicAdd(CurrentSize, fileSize);
+ ++CurrentFiles;
+ CurrentSize += fileSize;
}
// Ignore HardLink fail. Another process managed to download before us
TouchFile(storageFile.c_str());
@@ -281,10 +285,10 @@ public:
const i64 newFileSize = Max<i64>(0, GetFileLength(dstStorageFile.c_str()));
if (!prevFileExisted) {
- AtomicIncrement(CurrentFiles);
+ ++CurrentFiles;
}
- AtomicAdd(CurrentSize, newFileSize - prevFileSize);
+ CurrentSize += newFileSize - prevFileSize;
}
bool RemoveFromStorage(const TString& existingStorageFileName) {
@@ -300,19 +304,19 @@ public:
const bool result = NFs::Remove(storageFile);
if (result || !storageFile.Exists()) {
- AtomicDecrement(CurrentFiles);
- AtomicAdd(CurrentSize, -prevFileSize);
+ ++CurrentFiles;
+ CurrentSize -= prevFileSize;
}
return result;
}
ui64 GetOccupiedSize() const {
- return AtomicGet(CurrentSize);
+ return CurrentSize.load();
}
size_t GetCount() const {
- return AtomicGet(CurrentFiles);
+ return CurrentFiles.load();
}
TString GetTempName() {
@@ -365,15 +369,17 @@ private:
CurrentSize = actualSize;
}
- bool NeedToCleanup() {
- return static_cast<ui64>(AtomicGet(CurrentFiles)) > MaxFiles ||
- static_cast<ui64>(AtomicGet(CurrentSize)) > MaxSize;
+ bool NeedToCleanup() const {
+ return Dirty.load()
+ || static_cast<ui64>(CurrentFiles.load()) > MaxFiles
+ || static_cast<ui64>(CurrentSize.load()) > MaxSize;
}
void Cleanup() {
if (!NeedToCleanup()) {
return;
}
+ Dirty.store(false);
with_lock (CleanupLock) {
TVector<TString> names;
@@ -422,15 +428,17 @@ private:
}
}
- AtomicSet(CurrentFiles, actualFiles);
- AtomicSet(CurrentSize, actualSize);
+ CurrentFiles.store(actualFiles);
+ CurrentSize.store(actualSize);
}
}
- void ResetRandom() {
+ void ResetAtFork() {
with_lock(RndLock) {
Rnd.ResetSeed();
}
+ // Force cleanup on next file add, because other processes may change the state
+ Dirty.store(true);
}
private:
@@ -441,8 +449,9 @@ private:
const bool IsTemp;
const ui64 MaxFiles;
const ui64 MaxSize;
- TAtomic CurrentFiles = 0;
- TAtomic CurrentSize = 0;
+ std::atomic<i64> CurrentFiles = 0;
+ std::atomic<i64> CurrentSize = 0;
+ std::atomic_bool Dirty;
TMutex RndLock;
TRandGuid Rnd;
};