aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <filitovme@gmail.com>2025-04-09 16:22:07 +0200
committerGitHub <noreply@github.com>2025-04-09 16:22:07 +0200
commit751c197f1a8cf88c09b44fee5e4afe528d0a970f (patch)
tree5380da9c60b6a775f43022555e3489a71af91032
parent122c8d60dbd031e265d172e3b4f6d1346abab1f5 (diff)
downloadydb-751c197f1a8cf88c09b44fee5e4afe528d0a970f.tar.gz
Change exception to error in spilling queue (#16922)
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file.cpp67
-rw-r--r--ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp32
2 files changed, 77 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
index fa1c11ace45..083b9746315 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp
@@ -353,7 +353,9 @@ private:
closeOp->FileNames.emplace_back(std::move(fp.FileName));
}
- RunOp("CloseFile", std::move(closeOp), fd);
+ if (!RunOp("CloseFile", std::move(closeOp), fd)) {
+ LOG_E("[CloseFile] Can not run operation");
+ }
} else {
MoveFileToClosed(it);
}
@@ -469,7 +471,12 @@ private:
writeOp->BlobId = msg.BlobId;
writeOp->Blob = std::move(msg.Blob);
- RunOp("Write", std::move(writeOp), fd);
+ if (!RunOp("Write", std::move(writeOp), fd)) {
+ TString error = "[Write] Can not run operation";
+ LOG_E(error);
+
+ Send(ev->Sender, new TEvDqSpilling::TEvError(error));
+ }
}
void HandleWork(TEvPrivate::TEvWriteFileResponse::TPtr& ev) {
@@ -526,8 +533,15 @@ private:
Counters_->SpillingWriteBlobs->Inc();
- Send(msg.Client, new TEvDqSpilling::TEvWriteResult(msg.BlobId));
- RunNextOp(fd);
+ if (RunNextOp(fd)) {
+ Send(msg.Client, new TEvDqSpilling::TEvWriteResult(msg.BlobId));
+ } else {
+ TString error = "[WriteFileResponse] Can not run operation";
+ LOG_E(error);
+
+ Send(ev->Sender, new TEvDqSpilling::TEvError(error));
+ return;
+ }
}
void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) {
@@ -602,7 +616,12 @@ private:
readOp->RemoveFile = std::move(fp->FileHandle);
}
- RunOp("Read", std::move(readOp), fd);
+ if (!RunOp("Read", std::move(readOp), fd)) {
+ TString error = "[Read] Can not run operation";
+ LOG_E(error);
+
+ Send(ev->Sender, new TEvDqSpilling::TEvError(error));
+ }
}
void HandleWork(TEvPrivate::TEvReadFileResponse::TPtr& ev) {
@@ -660,8 +679,15 @@ private:
Counters_->SpillingReadBlobs->Inc();
- Send(msg.Client, new TEvDqSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob)));
- RunNextOp(fd);
+ if (RunNextOp(fd)) {
+ Send(msg.Client, new TEvDqSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob)));
+ } else {
+ TString error = "[ReadFileResponse] Can not run operation";
+ LOG_E(error);
+
+ Send(ev->Sender, new TEvDqSpilling::TEvError(error));
+ return;
+ }
}
void HandleWork(NMon::TEvHttpInfo::TPtr& ev) {
@@ -789,25 +815,28 @@ private:
}
private:
- void RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
+
+ bool RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
if (fd.HasActiveOp) {
fd.Ops.emplace_back(opName, std::move(op));
- } else {
- fd.HasActiveOp = true;
- // TODO: retry if fails
- IoThreadPool_->SafeAddAndOwn(std::move(op));
+ return true;
}
+
+ fd.HasActiveOp = true;
+
+ return IoThreadPool_->AddAndOwn(std::move(op));
}
- void RunNextOp(TFileDesc& fd) {
+ bool RunNextOp(TFileDesc& fd) {
fd.HasActiveOp = false;
- if (!fd.Ops.empty()) {
- auto op = std::move(fd.Ops.front().second);
- auto opName = fd.Ops.front().first;
- fd.Ops.pop_front();
-
- RunOp(opName, std::move(op), fd);
+ if (fd.Ops.empty()) {
+ return true;
}
+ auto op = std::move(fd.Ops.front().second);
+ auto opName = fd.Ops.front().first;
+ fd.Ops.pop_front();
+
+ return RunOp(opName, std::move(op), fd);
}
void MoveFileToClosed(TFilesIt it) {
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
index 04f4a6ecc3c..18d583c8e36 100644
--- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
+++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp
@@ -53,7 +53,7 @@ public:
}
TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500,
- ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix())
+ ui64 maxFilePartSize = 100, ui32 ioThreadPoolQueueSize = 1000, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix())
{
SpillingRoot_ = root;
SpillingSessionId_ = CreateGuidAsString();
@@ -63,7 +63,8 @@ public:
.SpillingSessionId = SpillingSessionId_,
.MaxTotalSize = maxTotalSize,
.MaxFileSize = maxFileSize,
- .MaxFilePartSize = maxFilePartSize
+ .MaxFilePartSize = maxFilePartSize,
+ .IoThreadPoolQueueSize = ioThreadPoolQueueSize
};
auto counters = Counters();
@@ -497,11 +498,36 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
}
}
+ Y_UNIT_TEST(ThreadPoolQueueOverflow) {
+ TTestActorRuntime runtime;
+ runtime.Initialize();
+
+ runtime.StartSpillingService(1000, 500, 10, 1);
+ ui32 iters = 100;
+ std::vector<TActorId> testers;
+ std::vector<TActorId> spillingActors;
+ for (ui32 i = 0; i < iters; ++i) {
+ testers.emplace_back(runtime.AllocateEdgeActor());
+ spillingActors.emplace_back(runtime.StartSpillingActor(testers.back()));
+ }
+
+ runtime.WaitBootstrap();
+
+ for (ui32 i = 0; i < iters; ++i) {
+ auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(10, 'a'));
+ runtime.Send(new IEventHandle(spillingActors[i], testers[i], ev));
+ }
+
+ auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(TDuration::Seconds(1));
+ Cerr << resp->Message << Endl;
+ UNIT_ASSERT_EQUAL("[Write] Can not run operation", resp->Message);
+ }
+
Y_UNIT_TEST(StartError) {
TTestActorRuntime runtime;
runtime.Initialize();
- auto spillingService = runtime.StartSpillingService(100, 500, 100, TFsPath("/nonexistent") / runtime.GetSpillingPrefix());
+ auto spillingService = runtime.StartSpillingService(100, 500, 100, 1000, TFsPath("/nonexistent") / runtime.GetSpillingPrefix());
auto tester = runtime.AllocateEdgeActor();
auto spillingActor = runtime.StartSpillingActor(tester);