aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruniquelogin <pavel.zuev@gmail.com>2025-01-24 14:29:14 +0500
committerGitHub <noreply@github.com>2025-01-24 12:29:14 +0300
commitf1461921e53f5e780bdd7f4f8dc3585cebfde37e (patch)
treea21f8c0f3c0da96b9cc252fd608e1cd77adf54f4
parent672f2e50fdb79ce7e0c645eaaf6680084ba6f8e9 (diff)
downloadydb-f1461921e53f5e780bdd7f4f8dc3585cebfde37e.tar.gz
#13427 Add a monitoring counter for fds kept open by the spilling system (#13611)
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_counters.cpp1
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_counters.h1
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file.cpp5
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp83
4 files changed, 90 insertions, 0 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
index d3023385c1f..0f882d55fdb 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.cpp
@@ -10,6 +10,7 @@ TSpillingCounters::TSpillingCounters(const TIntrusivePtr<::NMonitoring::TDynamic
SpillingTooBigFileErrors = counters->GetCounter("Spilling/TooBigFileErrors", true);
SpillingNoSpaceErrors = counters->GetCounter("Spilling/NoSpaceErrors", true);
SpillingIoErrors = counters->GetCounter("Spilling/IoErrors", true);
+ SpillingFileDescriptors = counters->GetCounter("Spilling/FileDescriptors", false);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_counters.h b/ydb/library/yql/dq/actors/spilling/spilling_counters.h
index 1672ac654af..81ba98ebe7f 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_counters.h
+++ b/ydb/library/yql/dq/actors/spilling/spilling_counters.h
@@ -17,6 +17,7 @@ struct TSpillingCounters : public TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr SpillingTooBigFileErrors;
::NMonitoring::TDynamicCounters::TCounterPtr SpillingNoSpaceErrors;
::NMonitoring::TDynamicCounters::TCounterPtr SpillingIoErrors;
+ ::NMonitoring::TDynamicCounters::TCounterPtr SpillingFileDescriptors;
};
struct TSpillingTaskCounters : public TThrRefBase {
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
index 17a11f605d4..fa1c11ace45 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
@@ -369,6 +369,8 @@ private:
return;
}
+ Counters_->SpillingFileDescriptors->Sub(it->second.PartsList.size());
+
ui64 blobs = 0;
for (auto& fp : it->second.PartsList) {
blobs += fp.Blobs.size();
@@ -502,6 +504,7 @@ private:
Counters_->SpillingTotalSpaceUsed->Add(blobDesc.Size);
if (msg.NewFileHandle) {
+ Counters_->SpillingFileDescriptors->Inc();
fp->FileHandle.Swap(msg.NewFileHandle);
}
} else {
@@ -636,6 +639,7 @@ private:
Counters_->SpillingTotalSpaceUsed->Sub(fp->Size);
Counters_->SpillingStoredBlobs->Sub(fp->Blobs.size());
+ Counters_->SpillingFileDescriptors->Dec();
fd.Parts.erase(msg.BlobId);
fd.PartsList.remove_if([fp](const auto& x) { return &x == fp; });
@@ -681,6 +685,7 @@ private:
TAG(TH2) { s << "Active files"; }
PRE() { s << "Used space: " << TotalSize_ << Endl; }
+ PRE() { s << "Used file descriptors: " << Counters_->SpillingFileDescriptors->Val() << Endl; }
for (const auto& tx : byTx) {
TAG(TH2) { s << "Transaction " << tx.first; }
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
index 26a1f9dd772..04f4a6ecc3c 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
@@ -382,6 +382,89 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
}
}
+ template<bool MultiPart>
+ void DoFdCounterTest()
+ {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ auto spillingService = runtime.StartSpillingService(1000, 100, 25);
+ auto tester = runtime.AllocateEdgeActor();
+ auto spillingActor = runtime.StartSpillingActor(tester, MultiPart);
+
+ runtime.WaitBootstrap();
+
+ const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_";
+
+ constexpr const size_t numBlobs = 5;
+ constexpr const size_t numFiles = MultiPart ? numBlobs : 1;
+
+ auto assertFdCounter = [&](const size_t expected) {
+ THttpRequest httpReq(HTTP_METHOD_GET);
+ NMonitoring::TMonService2HttpRequest monReq(nullptr, &httpReq, nullptr, nullptr, "", nullptr);
+
+ runtime.Send(new IEventHandle(spillingService, tester, new NMon::TEvHttpInfo(monReq)));
+ auto resp = runtime.GrabEdgeEvent<NMon::TEvHttpInfoRes>(tester, TDuration::Seconds(1));
+ UNIT_ASSERT(((NMon::TEvHttpInfoRes*) resp->Get())->Answer.Contains(TStringBuilder() << "Used file descriptors: " << expected));
+ };
+
+ // write some blobs; one file per blob is created when MultiPart is true, a file per client otherwise
+ for (ui32 i = 0; i < numBlobs; ++i) {
+ auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(20, 'a' + i));
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
+ }
+
+ assertFdCounter(numFiles);
+
+ // read back a single blob
+ {
+ const size_t blobIdx = 0;
+ auto ev = new TEvDqSpilling::TEvRead(blobIdx);
+ runtime.Send(new IEventHandle(spillingActor, tester, ev));
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvReadResult>(tester);
+ }
+
+ if (MultiPart) {
+ assertFdCounter(numFiles - 1);
+ } else {
+ assertFdCounter(numFiles);
+ }
+
+ // close everything
+ {
+ runtime.Send(new IEventHandle(spillingActor, tester, new TEvents::TEvPoison));
+
+ std::atomic<bool> done = false;
+ runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
+ if (event->GetRecipientRewrite() == spillingService) {
+ if (event->GetTypeRewrite() == 2146435074 /* EvCloseFileResponse */) {
+ done = true;
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ });
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return (bool) done;
+ };
+
+ runtime.DispatchEvents(options, TDuration::Seconds(1));
+ }
+
+ assertFdCounter(0);
+ }
+
+ Y_UNIT_TEST(FdCounterSingleFile) {
+ DoFdCounterTest<false>();
+ }
+
+ Y_UNIT_TEST(FdCounterMultiFile) {
+ DoFdCounterTest<true>();
+ }
+
Y_UNIT_TEST(ReadError) {
TTestActorRuntime runtime;
runtime.Initialize();