diff options
author | uniquelogin <pavel.zuev@gmail.com> | 2025-01-24 14:29:14 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-01-24 12:29:14 +0300 |
commit | f1461921e53f5e780bdd7f4f8dc3585cebfde37e (patch) | |
tree | a21f8c0f3c0da96b9cc252fd608e1cd77adf54f4 | |
parent | 672f2e50fdb79ce7e0c645eaaf6680084ba6f8e9 (diff) | |
download | ydb-f1461921e53f5e780bdd7f4f8dc3585cebfde37e.tar.gz |
#13427 Add a monitoring counter for fds kept open by the spilling system (#13611)
4 files changed, 90 insertions, 0 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp index d3023385c1f..0f882d55fdb 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp @@ -10,6 +10,7 @@ TSpillingCounters::TSpillingCounters(const TIntrusivePtr<::NMonitoring::TDynamic SpillingTooBigFileErrors = counters->GetCounter("Spilling/TooBigFileErrors", true); SpillingNoSpaceErrors = counters->GetCounter("Spilling/NoSpaceErrors", true); SpillingIoErrors = counters->GetCounter("Spilling/IoErrors", true); + SpillingFileDescriptors = counters->GetCounter("Spilling/FileDescriptors", false); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.h b/ydb/library/yql/dq/actors/spilling/spilling_counters.h index 1672ac654af..81ba98ebe7f 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_counters.h +++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.h @@ -17,6 +17,7 @@ struct TSpillingCounters : public TThrRefBase { ::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors; ::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors; ::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors; + ::NMonitoring::TDynamicCounters::TCounterPtr SpillingFileDescriptors; }; struct TSpillingTaskCounters : public TThrRefBase { diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp index 17a11f605d4..fa1c11ace45 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp @@ -369,6 +369,8 @@ private: return; } + Counters_->SpillingFileDescriptors->Sub(it->second.PartsList.size()); + ui64 blobs = 0; for (auto& fp : it->second.PartsList) { blobs += fp.Blobs.size(); @@ -502,6 +504,7 @@ private: Counters_->SpillingTotalSpaceUsed->Add(blobDesc.Size); if (msg.NewFileHandle) { + Counters_->SpillingFileDescriptors->Inc(); fp->FileHandle.Swap(msg.NewFileHandle); } } else { @@ -636,6 +639,7 @@ private: Counters_->SpillingTotalSpaceUsed->Sub(fp->Size); Counters_->SpillingStoredBlobs->Sub(fp->Blobs.size()); + Counters_->SpillingFileDescriptors->Dec(); fd.Parts.erase(msg.BlobId); fd.PartsList.remove_if([fp](const auto& x) { return &x == fp; }); @@ -681,6 +685,7 @@ private: TAG(TH2) { s << "Active files"; } PRE() { s << "Used space: " << TotalSize_ << Endl; } + PRE() { s << "Used file descriptors: " << Counters_->SpillingFileDescriptors->Val() << Endl; } for (const auto& tx : byTx) { TAG(TH2) { s << "Transaction " << tx.first; } diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp index 26a1f9dd772..04f4a6ecc3c 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp @@ -382,6 +382,89 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { } } + template<bool MultiPart> + void DoFdCounterTest() + { + TTestActorRuntime runtime; + runtime.Initialize(); + + auto spillingService = runtime.StartSpillingService(1000, 100, 25); + auto tester = runtime.AllocateEdgeActor(); + auto spillingActor = runtime.StartSpillingActor(tester, MultiPart); + + runtime.WaitBootstrap(); + + const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_"; + + constexpr const size_t numBlobs = 5; + constexpr const size_t numFiles = MultiPart ? numBlobs : 1; + + auto assertFdCounter = [&](const size_t expected) { + THttpRequest httpReq(HTTP_METHOD_GET); + NMonitoring::TMonService2HttpRequest monReq(nullptr, &httpReq, nullptr, nullptr, "", nullptr); + + runtime.Send(new IEventHandle(spillingService, tester, new NMon::TEvHttpInfo(monReq))); + auto resp = runtime.GrabEdgeEvent<NMon::TEvHttpInfoRes>(tester, TDuration::Seconds(1)); + UNIT_ASSERT(((NMon::TEvHttpInfoRes*) resp->Get())->Answer.Contains(TStringBuilder() << "Used file descriptors: " << expected)); + }; + + // write some blobs; one file per blob is created when MultiPart is true, a file per client otherwise + for (ui32 i = 0; i < numBlobs; ++i) { + auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(20, 'a' + i)); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester); + } + + assertFdCounter(numFiles); + + // read back a single blob + { + const size_t blobIdx = 0; + auto ev = new TEvDqSpilling::TEvRead(blobIdx); + runtime.Send(new IEventHandle(spillingActor, tester, ev)); + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester); + } + + if (MultiPart) { + assertFdCounter(numFiles - 1); + } else { + assertFdCounter(numFiles); + } + + // close everything + { + runtime.Send(new IEventHandle(spillingActor, tester, new TEvents::TEvPoison)); + + std::atomic<bool> done = false; + runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) { + if (event->GetRecipientRewrite() == spillingService) { + if (event->GetTypeRewrite() == 2146435074 /* EvCloseFileResponse */) { + done = true; + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }); + + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return (bool) done; + }; + + runtime.DispatchEvents(options, TDuration::Seconds(1)); + } + + assertFdCounter(0); + } + + Y_UNIT_TEST(FdCounterSingleFile) { + DoFdCounterTest<false>(); + } + + Y_UNIT_TEST(FdCounterMultiFile) { + DoFdCounterTest<true>(); + } + Y_UNIT_TEST(ReadError) { TTestActorRuntime runtime; runtime.Initialize(); |