aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWhompe <vladl2802@yandex.ru>2024-07-30 18:22:53 +0300
committerGitHub <noreply@github.com>2024-07-30 17:22:53 +0200
commitc0f743df2cd1cef5a16ab4df8a6bb08a8352a2fd (patch)
tree8219714887f56952d38d77d7aadb4b6cd8aee875
parentee419dbcdec5f5bac819a17618598ca2cf13e4c2 (diff)
downloadydb-c0f743df2cd1cef5a16ab4df8a6bb08a8352a2fd.tar.gz
Remove old spilling tmp files (#7108)
Co-authored-by: Vladislav Lukachik <vladluk@yandex-team.ru>
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp6
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file.cpp74
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file.h5
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp15
-rw-r--r--ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp5
5 files changed, 94 insertions, 11 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 69da792d86..36905b94ad 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -44,7 +44,8 @@
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/resource/resource.h>
-#include <util/generic/guid.h>
+
+#include <util/folder/dirut.h>
namespace NKikimr::NKqp {
@@ -236,7 +237,8 @@ public:
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
TString spillingRoot = cfg.GetRoot();
if (spillingRoot.empty()) {
- spillingRoot = TStringBuilder() << "/tmp/ydb_spilling_" << CreateGuidAsString() << "/";
+ spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
+ MakeDirIfNotExist(spillingRoot);
}
SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
index 9975e7d7f2..304433a264 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
@@ -13,6 +13,11 @@
#include <util/folder/path.h>
#include <util/stream/file.h>
#include <util/thread/pool.h>
+#include <util/generic/guid.h>
+#include <util/folder/iterator.h>
+#include <util/generic/vector.h>
+#include <util/folder/dirut.h>
+#include <util/system/user.h>
namespace NYql::NDq {
@@ -159,6 +164,7 @@ private:
EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1,
EvWriteFileResponse,
EvReadFileResponse,
+ EvRemoveOldTmp,
LastEvent
};
@@ -189,6 +195,15 @@ private:
bool Removed = false;
TMaybe<TString> Error;
};
+
+ struct TEvRemoveOldTmp : public TEventLocal<TEvRemoveOldTmp, EvRemoveOldTmp> {
+ TFsPath TmpRoot;
+ ui32 NodeId;
+ TString SpillingSessionId;
+
+ TEvRemoveOldTmp(TFsPath tmpRoot, ui32 nodeId, TString spillingSessionId)
+ : TmpRoot(std::move(tmpRoot)), NodeId(nodeId), SpillingSessionId(std::move(spillingSessionId)) {}
+ };
};
struct TFileDesc;
@@ -206,8 +221,11 @@ public:
void Bootstrap() {
Root_ = Config_.Root;
- Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId());
+ const auto rootToRemoveOldTmp = Root_;
+ const auto sessionId = Config_.SpillingSessionId;
+ const auto nodeId = SelfId().NodeId();
+ Root_ /= (TStringBuilder() << NodePrefix_ << "_" << nodeId << "_" << sessionId);
LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId());
try {
@@ -221,6 +239,8 @@ public:
Become(&TDqLocalFileSpillingService::BrokenState);
return;
}
+
+ Send(SelfId(), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(rootToRemoveOldTmp, nodeId, sessionId));
Become(&TDqLocalFileSpillingService::WorkState);
}
@@ -271,6 +291,7 @@ private:
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
hFunc(TEvDqSpilling::TEvRead, HandleWork)
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
+ hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork)
hFunc(NMon::TEvHttpInfo, HandleWork)
cFunc(TEvents::TEvPoison::EventType, PassAway)
);
@@ -712,6 +733,50 @@ private:
Send(ev->Sender, new NMon::TEvHttpInfoRes(s.Str()));
}
+ void HandleWork(TEvPrivate::TEvRemoveOldTmp::TPtr& ev) {
+ const auto& msg = *ev->Get();
+ const auto& root = msg.TmpRoot;
+ const auto nodeIdString = ToString(msg.NodeId);
+ const auto& sessionId = msg.SpillingSessionId;
+ const auto& nodePrefix = this->NodePrefix_;
+
+ LOG_I("[RemoveOldTmp] removing at root: " << root);
+
+ const auto isDirOldTmp = [&nodePrefix, &nodeIdString, &sessionId](const TString& dirName) -> bool {
+ // dirName: node_<nodeId>_<sessionId>
+ TVector<TString> parts;
+ StringSplitter(dirName).Split('_').Limit(3).Collect(&parts);
+
+ if (parts.size() < 3) {
+ return false;
+ }
+ return parts[0] == nodePrefix && parts[1] == nodeIdString && parts[2] != sessionId;
+ };
+
+ try {
+ TDirIterator iter(root, TDirIterator::TOptions().SetMaxLevel(1));
+
+ TVector<TString> oldTmps;
+ for (const auto& dirEntry : iter) {
+ if (dirEntry.fts_info == FTS_DP) {
+ continue;
+ }
+
+ const auto dirName = dirEntry.fts_name;
+ if (isDirOldTmp(dirName)) {
+ LOG_D("[RemoveOldTmp] found old temporary at " << (root / dirName));
+ oldTmps.emplace_back(std::move(dirName));
+ }
+ }
+
+ for (const auto& dirName : oldTmps) {
+ (root / dirName).ForceDelete();
+ }
+ } catch (const yexception& e) {
+ LOG_E("[RemoveOldTmp] removing failed due to: " << e.what());
+ }
+ }
+
private:
void RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
if (fd.HasActiveOp) {
@@ -941,6 +1006,7 @@ private:
private:
const TFileSpillingServiceConfig Config_;
+ const TString NodePrefix_ = "node";
TFsPath Root_;
TIntrusivePtr<TSpillingCounters> Counters_;
@@ -952,6 +1018,12 @@ private:
} // anonymous namespace
+TFsPath GetTmpSpillingRootForCurrentUser() {
+ auto root = TFsPath{GetSystemTempDir()};
+ root /= "spilling-tmp-" + GetUsername();
+ return root;
+}
+
IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client,
bool removeBlobsAfterRead)
{
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.h b/ydb/library/yql/dq/actors/spilling/spilling_file.h
index ebf6a0b6c8..55fe819146 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file.h
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file.h
@@ -7,11 +7,14 @@
#include <util/system/types.h>
#include <util/generic/strbuf.h>
+#include <util/folder/path.h>
+#include <util/generic/guid.h>
namespace NYql::NDq {
struct TFileSpillingServiceConfig {
TString Root;
+ TString SpillingSessionId = CreateGuidAsString();
ui64 MaxTotalSize = 0;
ui64 MaxFileSize = 0;
ui64 MaxFilePartSize = 0;
@@ -26,6 +29,8 @@ inline NActors::TActorId MakeDqLocalFileSpillingServiceID(ui32 nodeId) {
return NActors::TActorId(nodeId, TStringBuf(name, 12));
}
+TFsPath GetTmpSpillingRootForCurrentUser();
+
NActors::IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const NActors::TActorId& client, bool removeBlobsAfterRead);
NActors::IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters);
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
index fc98fac663..6fd1be5aff 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
@@ -48,13 +48,19 @@ public:
return str;
}
+ const TString& GetSpillingSessionId() const {
+ return SpillingSessionId_;
+ }
+
TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500,
ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix())
{
SpillingRoot_ = root;
+ SpillingSessionId_ = CreateGuidAsString();
auto config = TFileSpillingServiceConfig{
.Root = root.GetPath(),
+ .SpillingSessionId = SpillingSessionId_,
.MaxTotalSize = maxTotalSize,
.MaxFileSize = maxFileSize,
.MaxFilePartSize = maxFilePartSize
@@ -91,6 +97,7 @@ public:
private:
TFsPath SpillingRoot_;
+ TString SpillingSessionId_;
};
TBuffer CreateBlob(ui32 size, char symbol) {
@@ -303,8 +310,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
auto spillingActor = runtime.StartSpillingActor(tester);
runtime.WaitBootstrap();
-
- const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_";
+ const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_";
for (ui32 i = 0; i < 5; ++i) {
// Cerr << "---- store blob #" << i << Endl;
@@ -346,7 +352,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
runtime.WaitBootstrap();
- const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_";
+ const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_";
for (ui32 i = 0; i < 5; ++i) {
// Cerr << "---- store blob #" << i << Endl;
@@ -393,8 +399,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId);
}
-
- auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId()));
+ auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId()) + "_" + runtime.GetSpillingSessionId());
(runtime.GetSpillingRoot() / nodePath / "1_test_0").ForceDelete();
{
diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
index 8b87f8aebc..7c4f592454 100644
--- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
+++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp
@@ -80,9 +80,8 @@ public:
TActorSetupCmd(resman, TMailboxType::Simple, 0));
if (withSpilling) {
- char tempDir[MAX_PATH];
- if (MakeTempDir(tempDir, nullptr) != 0)
- ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir;
+ auto tempDir = NDq::GetTmpSpillingRootForCurrentUser();
+ MakeDirIfNotExist(tempDir);
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));