diff options
author | Filitov Mikhail <filitovme@gmail.com> | 2025-04-09 16:22:07 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-09 16:22:07 +0200 |
commit | 751c197f1a8cf88c09b44fee5e4afe528d0a970f (patch) | |
tree | 5380da9c60b6a775f43022555e3489a71af91032 | |
parent | 122c8d60dbd031e265d172e3b4f6d1346abab1f5 (diff) | |
download | ydb-751c197f1a8cf88c09b44fee5e4afe528d0a970f.tar.gz |
Change exception to error in spilling queue (#16922)
-rw-r--r-- | ydb/library/yql/dq/actors/spilling/spilling_file.cpp | 67 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp | 32 |
2 files changed, 77 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp index fa1c11ace45..083b9746315 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file.cpp @@ -353,7 +353,9 @@ private: closeOp->FileNames.emplace_back(std::move(fp.FileName)); } - RunOp("CloseFile", std::move(closeOp), fd); + if (!RunOp("CloseFile", std::move(closeOp), fd)) { + LOG_E("[CloseFile] Can not run operation"); + } } else { MoveFileToClosed(it); } @@ -469,7 +471,12 @@ private: writeOp->BlobId = msg.BlobId; writeOp->Blob = std::move(msg.Blob); - RunOp("Write", std::move(writeOp), fd); + if (!RunOp("Write", std::move(writeOp), fd)) { + TString error = "[Write] Can not run operation"; + LOG_E(error); + + Send(ev->Sender, new TEvDqSpilling::TEvError(error)); + } } void HandleWork(TEvPrivate::TEvWriteFileResponse::TPtr& ev) { @@ -526,8 +533,15 @@ private: Counters_->SpillingWriteBlobs->Inc(); - Send(msg.Client, new TEvDqSpilling::TEvWriteResult(msg.BlobId)); - RunNextOp(fd); + if (RunNextOp(fd)) { + Send(msg.Client, new TEvDqSpilling::TEvWriteResult(msg.BlobId)); + } else { + TString error = "[WriteFileResponse] Can not run operation"; + LOG_E(error); + + Send(ev->Sender, new TEvDqSpilling::TEvError(error)); + return; + } } void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) { @@ -602,7 +616,12 @@ private: readOp->RemoveFile = std::move(fp->FileHandle); } - RunOp("Read", std::move(readOp), fd); + if (!RunOp("Read", std::move(readOp), fd)) { + TString error = "[Read] Can not run operation"; + LOG_E(error); + + Send(ev->Sender, new TEvDqSpilling::TEvError(error)); + } } void HandleWork(TEvPrivate::TEvReadFileResponse::TPtr& ev) { @@ -660,8 +679,15 @@ private: Counters_->SpillingReadBlobs->Inc(); - Send(msg.Client, new TEvDqSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob))); - RunNextOp(fd); + if (RunNextOp(fd)) { + Send(msg.Client, new TEvDqSpilling::TEvReadResult(msg.BlobId, std::move(msg.Blob))); + } else { + TString error = "[ReadFileResponse] Can not run operation"; + LOG_E(error); + + Send(ev->Sender, new TEvDqSpilling::TEvError(error)); + return; + } } void HandleWork(NMon::TEvHttpInfo::TPtr& ev) { @@ -789,25 +815,28 @@ private: } private: - void RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) { + + bool RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) { if (fd.HasActiveOp) { fd.Ops.emplace_back(opName, std::move(op)); - } else { - fd.HasActiveOp = true; - // TODO: retry if fails - IoThreadPool_->SafeAddAndOwn(std::move(op)); + return true; } + + fd.HasActiveOp = true; + + return IoThreadPool_->AddAndOwn(std::move(op)); } - void RunNextOp(TFileDesc& fd) { + bool RunNextOp(TFileDesc& fd) { fd.HasActiveOp = false; - if (!fd.Ops.empty()) { - auto op = std::move(fd.Ops.front().second); - auto opName = fd.Ops.front().first; - fd.Ops.pop_front(); - - RunOp(opName, std::move(op), fd); + if (fd.Ops.empty()) { + return true; } + auto op = std::move(fd.Ops.front().second); + auto opName = fd.Ops.front().first; + fd.Ops.pop_front(); + + return RunOp(opName, std::move(op), fd); } void MoveFileToClosed(TFilesIt it) { diff --git a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp index 04f4a6ecc3c..18d583c8e36 100644 --- a/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp +++ b/ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp @@ -53,7 +53,7 @@ public: } TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500, - ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix()) + ui64 maxFilePartSize = 100, ui32 ioThreadPoolQueueSize = 1000, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix()) { SpillingRoot_ = root; SpillingSessionId_ = CreateGuidAsString(); @@ -63,7 +63,8 @@ public: .SpillingSessionId = SpillingSessionId_, .MaxTotalSize = maxTotalSize, .MaxFileSize = maxFileSize, - .MaxFilePartSize = maxFilePartSize + .MaxFilePartSize = maxFilePartSize, + .IoThreadPoolQueueSize = ioThreadPoolQueueSize }; auto counters = Counters(); @@ -497,11 +498,36 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) { } } + Y_UNIT_TEST(ThreadPoolQueueOverflow) { + TTestActorRuntime runtime; + runtime.Initialize(); + + runtime.StartSpillingService(1000, 500, 10, 1); + ui32 iters = 100; + std::vector<TActorId> testers; + std::vector<TActorId> spillingActors; + for (ui32 i = 0; i < iters; ++i) { + testers.emplace_back(runtime.AllocateEdgeActor()); + spillingActors.emplace_back(runtime.StartSpillingActor(testers.back())); + } + + runtime.WaitBootstrap(); + + for (ui32 i = 0; i < iters; ++i) { + auto ev = new TEvDqSpilling::TEvWrite(i, CreateRope(10, 'a')); + runtime.Send(new IEventHandle(spillingActors[i], testers[i], ev)); + } + + auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(TDuration::Seconds(1)); + Cerr << resp->Message << Endl; + UNIT_ASSERT_EQUAL("[Write] Can not run operation", resp->Message); + } + Y_UNIT_TEST(StartError) { TTestActorRuntime runtime; runtime.Initialize(); - auto spillingService = runtime.StartSpillingService(100, 500, 100, TFsPath("/nonexistent") / runtime.GetSpillingPrefix()); + auto spillingService = runtime.StartSpillingService(100, 500, 100, 1000, TFsPath("/nonexistent") / runtime.GetSpillingPrefix()); auto tester = runtime.AllocateEdgeActor(); auto spillingActor = runtime.StartSpillingActor(tester); |