aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-04-11 00:30:43 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-04-11 00:42:09 +0300
commitca7c60d70c549b4464557560f4d6761a3cb1aefe (patch)
tree7590bbb4a08bb975652fe675028d3eba4b1608f3
parent8ee95ccae309141e34f6f447be95731a1b24342d (diff)
downloadydb-ca7c60d70c549b4464557560f4d6761a3cb1aefe.tar.gz
YT-21517: Fix local variables leak if coro is abandoned
37387257cd070822da8998d67eda34c29a129167
-rw-r--r--yt/yt/core/concurrency/coroutine-inl.h20
-rw-r--r--yt/yt/core/concurrency/coroutine.cpp19
-rw-r--r--yt/yt/core/concurrency/coroutine.h15
3 files changed, 41 insertions, 13 deletions
diff --git a/yt/yt/core/concurrency/coroutine-inl.h b/yt/yt/core/concurrency/coroutine-inl.h
index f9f28eab55..1a849112e1 100644
--- a/yt/yt/core/concurrency/coroutine-inl.h
+++ b/yt/yt/core/concurrency/coroutine-inl.h
@@ -38,21 +38,29 @@ void TCoroutineBase::TTrampoLine<TBody>::DoRun()
{
// Move/Copy stuff on stack frame.
auto* owner = Owner_;
+ bool abandoned = false;
{
auto body = std::move(*Body_);
- owner->Suspend();
+ try {
+ owner->Suspend();
+ } catch (TCoroutineAbandonedException) {
+ abandoned = true;
+ }
// Actual execution.
- try {
- body();
- } catch (...) {
- owner->CoroutineException_ = std::current_exception();
+ if (!abandoned) {
+ try {
+ body();
+ } catch (TCoroutineAbandonedException) {
+ } catch (...) {
+ owner->CoroutineException_ = std::current_exception();
+ }
}
}
- owner->Completed_ = true;
+ owner->State_ = ECoroState::Completed;
owner->Suspend();
YT_ABORT();
diff --git a/yt/yt/core/concurrency/coroutine.cpp b/yt/yt/core/concurrency/coroutine.cpp
index af80580c9e..6dc08abf5d 100644
--- a/yt/yt/core/concurrency/coroutine.cpp
+++ b/yt/yt/core/concurrency/coroutine.cpp
@@ -6,17 +6,26 @@ namespace NYT::NConcurrency::NDetail {
TCoroutineBase::~TCoroutineBase()
{
- std::destroy_at(&CoroutineContext);
+ if (State_ == ECoroState::Running) {
+ State_ = ECoroState::Abandoned;
+ Resume();
+ }
+
+ std::destroy_at(std::launder(&CoroutineContext));
}
-void TCoroutineBase::Suspend() noexcept
+void TCoroutineBase::Suspend()
{
- CoroutineContext.SwitchTo(&CallerContext_);
+ std::launder(&CoroutineContext)->SwitchTo(&CallerContext_);
+
+ if (State_ == ECoroState::Abandoned) {
+ throw TCoroutineAbandonedException{};
+ }
}
void TCoroutineBase::Resume()
{
- CallerContext_.SwitchTo(&CoroutineContext);
+ CallerContext_.SwitchTo(std::launder(&CoroutineContext));
if (CoroutineException_) {
std::exception_ptr exception;
@@ -27,7 +36,7 @@ void TCoroutineBase::Resume()
bool TCoroutineBase::IsCompleted() const noexcept
{
- return Completed_;
+ return State_ == ECoroState::Completed;
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/coroutine.h b/yt/yt/core/concurrency/coroutine.h
index ba1ac139ac..275870456b 100644
--- a/yt/yt/core/concurrency/coroutine.h
+++ b/yt/yt/core/concurrency/coroutine.h
@@ -20,6 +20,14 @@ namespace NYT::NConcurrency {
namespace NDetail {
+DEFINE_ENUM(ECoroState,
+ ((Running) (0))
+ ((Abandoned) (1))
+ ((Completed) (2))
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
class TCoroutineBase
{
public:
@@ -35,7 +43,7 @@ protected:
explicit TCoroutineBase(TBody body, EExecutionStackKind stackKind);
void Resume();
- void Suspend() noexcept;
+ void Suspend();
private:
std::shared_ptr<TExecutionStack> CoroutineStack_;
@@ -48,8 +56,11 @@ private:
TExceptionSafeContext CoroutineContext;
};
+ ECoroState State_ = ECoroState::Running;
+ struct TCoroutineAbandonedException
+ { };
+
std::exception_ptr CoroutineException_;
- bool Completed_ = false;
// NB(arkady-e1ppa): We make a "proxy-trampoline"
// which DoRun consist of two parts: