diff options
author | alexvru <alexvru@ydb.tech> | 2023-03-27 16:43:28 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-03-27 16:43:28 +0300 |
commit | 773aa8b4c48879485bc7cbaa6995d68a574530a1 (patch) | |
tree | e5cc3a7d8b0ce299ed8f6e2f423f9c0f149675cf | |
parent | 728860f3dcbfa5715bcea78401a55d22e6b24b04 (diff) | |
download | ydb-773aa8b4c48879485bc7cbaa6995d68a574530a1.tar.gz |
Fix asan problem with coro tx
-rw-r--r-- | library/cpp/actors/core/actor.h | 16 | ||||
-rw-r--r-- | ydb/core/blob_depot/coro_tx.cpp | 23 | ||||
-rw-r--r-- | ydb/core/blob_depot/coro_tx.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_decommit.cpp | 22 | ||||
-rw-r--r-- | ydb/core/blob_depot/data_load.cpp | 2 |
5 files changed, 41 insertions, 26 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 8330d860c8d..5e04c0f2abf 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -468,20 +468,24 @@ namespace NActors { // must be called to wrap any call trasitions from one actor to another template<typename TActor, typename TMethod, typename... TArgs> - static decltype((std::declval<TActor>().*std::declval<TMethod>())(std::declval<TArgs>()...)) - InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) { - struct TRecurseContext: TActorContext { - TActivationContext* Prev; + static std::invoke_result_t<TMethod, TActor, TArgs...> InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) { + struct TRecurseContext : TActorContext { + TActivationContext* const Prev; + TRecurseContext(const TActorId& actorId) : TActorContext(TActivationContext::ActorContextFor(actorId)) - , Prev(TlsActivationContext) { + , Prev(TlsActivationContext) + { TlsActivationContext = this; } + ~TRecurseContext() { + Y_VERIFY(TlsActivationContext == this, "TlsActivationContext mismatch; probably InvokeOtherActor was invoked from a coroutine"); TlsActivationContext = Prev; } } context(actor.SelfId()); - return (actor.*method)(std::forward<TArgs>(args)...); + + return std::invoke(std::forward<TMethod>(method), actor, std::forward<TArgs>(args)...); } virtual void Registered(TActorSystem* sys, const TActorId& owner); diff --git a/ydb/core/blob_depot/coro_tx.cpp b/ydb/core/blob_depot/coro_tx.cpp index 319bcb85739..84caf07bfc1 100644 --- a/ydb/core/blob_depot/coro_tx.cpp +++ b/ydb/core/blob_depot/coro_tx.cpp @@ -24,16 +24,16 @@ namespace NKikimr::NBlobDepot { EOutcome Outcome = EOutcome::UNSET; - std::weak_ptr<TToken> Token; + TTokens Tokens; std::function<void()> Body; bool Finished = false; public: - TContext(const std::weak_ptr<TToken>& token, std::function<void()>&& body) + TContext(TTokens&& tokens, std::function<void()>&& body) : Stack(65536) , Context({this, TArrayRef(Stack.Begin(), Stack.End())}) - , Token(token) + , Tokens(std::move(tokens)) , Body(std::move(body)) { #ifndef NDEBUG @@ -90,14 +90,23 @@ namespace NKikimr::NBlobDepot { Outcome = outcome; Y_VERIFY(BackContext); Context.SwitchTo(BackContext); - if (Token.expired() || Finished) { + if (IsExpired() || Finished) { throw TExDead(); } } private: + bool IsExpired() const { + for (auto& token : Tokens) { + if (token.expired()) { + return true; + } + } + return false; + } + void DoRun() override { - if (!Token.expired()) { + if (!IsExpired()) { try { Body(); } catch (const TExDead&) { @@ -109,9 +118,9 @@ namespace NKikimr::NBlobDepot { } }; - TCoroTx::TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body) + TCoroTx::TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body) : TTransactionBase(self) - , Context(std::make_unique<TContext>(token, std::move(body))) + , Context(std::make_unique<TContext>(std::move(tokens), std::move(body))) {} TCoroTx::TCoroTx(TCoroTx& predecessor) diff --git a/ydb/core/blob_depot/coro_tx.h b/ydb/core/blob_depot/coro_tx.h index f731ff88dd1..14d892337aa 100644 --- a/ydb/core/blob_depot/coro_tx.h +++ b/ydb/core/blob_depot/coro_tx.h @@ -7,6 +7,8 @@ namespace NKikimr::NBlobDepot { struct TExDead {}; + using TTokens = std::vector<std::weak_ptr<TToken>>; + class TCoroTx : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { class TContext; std::unique_ptr<TContext> Context; @@ -14,7 +16,7 @@ namespace NKikimr::NBlobDepot { static thread_local TCoroTx *Current; public: - TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body); + TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body); TCoroTx(TCoroTx& predecessor); ~TCoroTx(); diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp index fee51472b5a..4d469b25aa2 100644 --- a/ydb/core/blob_depot/data_decommit.cpp +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -12,6 +12,7 @@ namespace NKikimr::NBlobDepot { TBlobDepot* const Self; std::weak_ptr<TToken> Token; + std::shared_ptr<TToken> ActorToken = std::make_shared<TToken>(); std::vector<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs; THashSet<TLogoBlobID> ResolutionErrors; TEvBlobDepot::TEvResolve::TPtr Ev; @@ -47,7 +48,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()), (Sender, Ev->Sender), (Cookie, Ev->Cookie)); - Self->Execute(std::make_unique<TCoroTx>(Self, Token, std::bind(&TThis::TxPrepare, this))); + Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token, ActorToken}}, std::bind(&TThis::TxPrepare, this))); ++TxInFlight; Become(&TThis::StateFunc); } @@ -79,22 +80,21 @@ namespace NKikimr::NBlobDepot { // adjust minId to skip already assimilated items in range query if (minId < Self->Data->LastAssimilatedBlobId) { if (item.GetMustRestoreFirst()) { - InvokeOtherActor(*this, &TThis::ScanRange, TKey(minId), - TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN, - true /*issueGets*/); + ScanRange(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId), + EScanFlags::INCLUDE_BEGIN, true /*issueGets*/); } minId = *Self->Data->LastAssimilatedBlobId; } // prepare the range first -- we must have it loaded in memory - InvokeOtherActor(*this, &TThis::ScanRange, TKey(minId), TKey(maxId), - EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, false /*issueGets*/); + ScanRange(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, + false /*issueGets*/); // issue scan query - InvokeOtherActor(*this, &TThis::IssueRange, tabletId, minId, maxId, item.GetMustRestoreFirst()); + IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst()); } else if (item.GetMustRestoreFirst()) { - InvokeOtherActor(*this, &TThis::ScanRange, TKey(minId), TKey(maxId), - EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, true /*issueGets*/); + ScanRange(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, + true /*issueGets*/); } break; @@ -109,7 +109,7 @@ namespace NKikimr::NBlobDepot { const bool doGet = (!value && Self->Data->LastAssimilatedBlobId < key.GetBlobId()) // value not yet assimilated || (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet if (doGet) { - InvokeOtherActor(*this, &TThis::IssueGet, key.GetBlobId(), item.GetMustRestoreFirst()); + IssueGet(key.GetBlobId(), item.GetMustRestoreFirst()); } break; } @@ -309,7 +309,7 @@ namespace NKikimr::NBlobDepot { (Cookie, Ev->Cookie), (ResolutionErrors.size, ResolutionErrors.size()), (DecommitBlobs.size, DecommitBlobs.size())); - Self->Execute(std::make_unique<TCoroTx>(Self, Token, [self = Self, decommitBlobs = std::move(DecommitBlobs), + Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token}}, [self = Self, decommitBlobs = std::move(DecommitBlobs), ev = Ev, resolutionErrors = std::move(ResolutionErrors)]() mutable { ui32 numItemsProcessed = 0; for (const auto& blob : decommitBlobs) { diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index 3165c168739..fa299aa8d0c 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; void TData::StartLoad() { - Self->Execute(std::make_unique<TCoroTx>(Self, Self->Token, [&] { + Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Self->Token}}, [&] { bool progress = false; TString trash; |