diff options
author | Whompe <vladl2802@yandex.ru> | 2024-07-30 18:22:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-30 17:22:53 +0200 |
commit | c0f743df2cd1cef5a16ab4df8a6bb08a8352a2fd (patch) | |
tree | 8219714887f56952d38d77d7aadb4b6cd8aee875 | |
parent | ee419dbcdec5f5bac819a17618598ca2cf13e4c2 (diff) | |
download | ydb-c0f743df2cd1cef5a16ab4df8a6bb08a8352a2fd.tar.gz |
Remove old spilling tmp files (#7108)
Co-authored-by: Vladislav Lukachik <vladluk@yandex-team.ru>
5 files changed, 94 insertions, 11 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 69da792d86..36905b94ad 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -44,7 +44,8 @@ #include <library/cpp/lwtrace/mon/mon_lwtrace.h> #include <library/cpp/monlib/service/pages/templates.h> #include <library/cpp/resource/resource.h> -#include <util/generic/guid.h> + +#include <util/folder/dirut.h> namespace NKikimr::NKqp { @@ -236,7 +237,8 @@ public: if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) { TString spillingRoot = cfg.GetRoot(); if (spillingRoot.empty()) { - spillingRoot = TStringBuilder() << "/tmp/ydb_spilling_" << CreateGuidAsString() << "/"; + spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser(); + MakeDirIfNotExist(spillingRoot); } SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService( diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp index 9975e7d7f2..304433a264 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp @@ -13,6 +13,11 @@ #include <util/folder/path.h> #include <util/stream/file.h> #include <util/thread/pool.h> +#include <util/generic/guid.h> +#include <util/folder/iterator.h> +#include <util/generic/vector.h> +#include <util/folder/dirut.h> +#include <util/system/user.h> namespace NYql::NDq { @@ -159,6 +164,7 @@ private: EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1, EvWriteFileResponse, EvReadFileResponse, + EvRemoveOldTmp, LastEvent }; @@ -189,6 +195,15 @@ private: bool Removed = false; TMaybe<TString> Error; }; + + struct TEvRemoveOldTmp : public TEventLocal<TEvRemoveOldTmp, EvRemoveOldTmp> { + TFsPath TmpRoot; + ui32 NodeId; + TString SpillingSessionId; + + TEvRemoveOldTmp(TFsPath tmpRoot, ui32 nodeId, TString spillingSessionId) + : TmpRoot(std::move(tmpRoot)), NodeId(nodeId), SpillingSessionId(std::move(spillingSessionId)) {} + }; }; struct TFileDesc; @@ -206,8 +221,11 @@ public: void Bootstrap() { Root_ = Config_.Root; - Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId()); + const auto rootToRemoveOldTmp = Root_; + const auto sessionId = Config_.SpillingSessionId; + const auto nodeId = SelfId().NodeId(); + Root_ /= (TStringBuilder() << NodePrefix_ << "_" << nodeId << "_" << sessionId); LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId()); try { @@ -221,6 +239,8 @@ public: Become(&TDqLocalFileSpillingService::BrokenState); return; } + + Send(SelfId(), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(rootToRemoveOldTmp, nodeId, sessionId)); Become(&TDqLocalFileSpillingService::WorkState); } @@ -271,6 +291,7 @@ private: hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork) hFunc(TEvDqSpilling::TEvRead, HandleWork) hFunc(TEvPrivate::TEvReadFileResponse, HandleWork) + hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork) hFunc(NMon::TEvHttpInfo, HandleWork) cFunc(TEvents::TEvPoison::EventType, PassAway) ); @@ -712,6 +733,50 @@ private: Send(ev->Sender, new NMon::TEvHttpInfoRes(s.Str())); } + void HandleWork(TEvPrivate::TEvRemoveOldTmp::TPtr& ev) { + const auto& msg = *ev->Get(); + const auto& root = msg.TmpRoot; + const auto nodeIdString = ToString(msg.NodeId); + const auto& sessionId = msg.SpillingSessionId; + const auto& nodePrefix = this->NodePrefix_; + + LOG_I("[RemoveOldTmp] removing at root: " << root); + + const auto isDirOldTmp = [&nodePrefix, &nodeIdString, &sessionId](const TString& dirName) -> bool { + // dirName: node_<nodeId>_<sessionId> + TVector<TString> parts; + StringSplitter(dirName).Split('_').Limit(3).Collect(&parts); + + if (parts.size() < 3) { + return false; + } + return parts[0] == nodePrefix && parts[1] == nodeIdString && parts[2] != sessionId; + }; + + try { + TDirIterator iter(root, TDirIterator::TOptions().SetMaxLevel(1)); + + TVector<TString> oldTmps; + for (const auto& dirEntry : iter) { + if (dirEntry.fts_info == FTS_DP) { + continue; + } + + const auto dirName = dirEntry.fts_name; + if (isDirOldTmp(dirName)) { + LOG_D("[RemoveOldTmp] found old temporary at " << (root / dirName)); + oldTmps.emplace_back(std::move(dirName)); + } + } + + for (const auto& dirName : oldTmps) { + (root / dirName).ForceDelete(); + } + } catch (const yexception& e) { + LOG_E("[RemoveOldTmp] removing failed due to: " << e.what()); + } + } + private: void RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) { if (fd.HasActiveOp) { @@ -941,6 +1006,7 @@ private: private: const TFileSpillingServiceConfig Config_; + const TString NodePrefix_ = "node"; TFsPath Root_; TIntrusivePtr<TSpillingCounters> Counters_; @@ -952,6 +1018,12 @@ private: } // anonymous namespace +TFsPath GetTmpSpillingRootForCurrentUser() { + auto root = TFsPath{GetSystemTempDir()}; + root /= "spilling-tmp-" + GetUsername(); + return root; +} + IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client, bool removeBlobsAfterRead) { diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.h b/ydb/library/yql/dq/actors/spilling/spilling_file.h index ebf6a0b6c8..55fe819146 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file.h +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.h @@ -7,11 +7,14 @@ #include <util/system/types.h> #include <util/generic/strbuf.h> +#include <util/folder/path.h> +#include <util/generic/guid.h> namespace NYql::NDq { struct TFileSpillingServiceConfig { TString Root; + TString SpillingSessionId = CreateGuidAsString(); ui64 MaxTotalSize = 0; ui64 MaxFileSize = 0; ui64 MaxFilePartSize = 0; @@ -26,6 +29,8 @@ inline NActors::TActorId MakeDqLocalFileSpillingServiceID(ui32 nodeId) { return NActors::TActorId(nodeId, TStringBuf(name, 12)); } +TFsPath GetTmpSpillingRootForCurrentUser(); + NActors::IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const NActors::TActorId& client, bool removeBlobsAfterRead); NActors::IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters); diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp index fc98fac663..6fd1be5aff 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp @@ -48,13 +48,19 @@ public: return str; } + const TString& GetSpillingSessionId() const { + return SpillingSessionId_; + } + TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500, ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix()) { SpillingRoot_ = root; + SpillingSessionId_ = CreateGuidAsString(); auto config = TFileSpillingServiceConfig{ .Root = root.GetPath(), + .SpillingSessionId = SpillingSessionId_, .MaxTotalSize = maxTotalSize, .MaxFileSize = maxFileSize, .MaxFilePartSize = maxFilePartSize @@ -91,6 +97,7 @@ public: private: TFsPath SpillingRoot_; + TString SpillingSessionId_; }; TBuffer CreateBlob(ui32 size, char symbol) { @@ -303,8 +310,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { auto spillingActor = runtime.StartSpillingActor(tester); runtime.WaitBootstrap(); - - const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_"; + const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_"; for (ui32 i = 0; i < 5; ++i) { // Cerr << "---- store blob #" << i << Endl; @@ -346,7 +352,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { runtime.WaitBootstrap(); - const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_"; + const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_"; for (ui32 i = 0; i < 5; ++i) { // Cerr << "---- store blob #" << i << Endl; @@ -393,8 +399,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId); } - - auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId())); + auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId()) + "_" + runtime.GetSpillingSessionId()); (runtime.GetSpillingRoot() / nodePath / "1_test_0").ForceDelete(); { diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index 8b87f8aebc..7c4f592454 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -80,9 +80,8 @@ public: TActorSetupCmd(resman, TMailboxType::Simple, 0)); if (withSpilling) { - char tempDir[MAX_PATH]; - if (MakeTempDir(tempDir, nullptr) != 0) - ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir; + auto tempDir = NDq::GetTmpSpillingRootForCurrentUser(); + MakeDirIfNotExist(tempDir); auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup)); |