aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-03-27 16:43:28 +0300
committeralexvru <alexvru@ydb.tech>2023-03-27 16:43:28 +0300
commit773aa8b4c48879485bc7cbaa6995d68a574530a1 (patch)
treee5cc3a7d8b0ce299ed8f6e2f423f9c0f149675cf
parent728860f3dcbfa5715bcea78401a55d22e6b24b04 (diff)
downloadydb-773aa8b4c48879485bc7cbaa6995d68a574530a1.tar.gz
Fix asan problem with coro tx
-rw-r--r--library/cpp/actors/core/actor.h16
-rw-r--r--ydb/core/blob_depot/coro_tx.cpp23
-rw-r--r--ydb/core/blob_depot/coro_tx.h4
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp22
-rw-r--r--ydb/core/blob_depot/data_load.cpp2
5 files changed, 41 insertions, 26 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 8330d860c8d..5e04c0f2abf 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -468,20 +468,24 @@ namespace NActors {
// must be called to wrap any call trasitions from one actor to another
template<typename TActor, typename TMethod, typename... TArgs>
- static decltype((std::declval<TActor>().*std::declval<TMethod>())(std::declval<TArgs>()...))
- InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
- struct TRecurseContext: TActorContext {
- TActivationContext* Prev;
+ static std::invoke_result_t<TMethod, TActor, TArgs...> InvokeOtherActor(TActor& actor, TMethod&& method, TArgs&&... args) {
+ struct TRecurseContext : TActorContext {
+ TActivationContext* const Prev;
+
TRecurseContext(const TActorId& actorId)
: TActorContext(TActivationContext::ActorContextFor(actorId))
- , Prev(TlsActivationContext) {
+ , Prev(TlsActivationContext)
+ {
TlsActivationContext = this;
}
+
~TRecurseContext() {
+ Y_VERIFY(TlsActivationContext == this, "TlsActivationContext mismatch; probably InvokeOtherActor was invoked from a coroutine");
TlsActivationContext = Prev;
}
} context(actor.SelfId());
- return (actor.*method)(std::forward<TArgs>(args)...);
+
+ return std::invoke(std::forward<TMethod>(method), actor, std::forward<TArgs>(args)...);
}
virtual void Registered(TActorSystem* sys, const TActorId& owner);
diff --git a/ydb/core/blob_depot/coro_tx.cpp b/ydb/core/blob_depot/coro_tx.cpp
index 319bcb85739..84caf07bfc1 100644
--- a/ydb/core/blob_depot/coro_tx.cpp
+++ b/ydb/core/blob_depot/coro_tx.cpp
@@ -24,16 +24,16 @@ namespace NKikimr::NBlobDepot {
EOutcome Outcome = EOutcome::UNSET;
- std::weak_ptr<TToken> Token;
+ TTokens Tokens;
std::function<void()> Body;
bool Finished = false;
public:
- TContext(const std::weak_ptr<TToken>& token, std::function<void()>&& body)
+ TContext(TTokens&& tokens, std::function<void()>&& body)
: Stack(65536)
, Context({this, TArrayRef(Stack.Begin(), Stack.End())})
- , Token(token)
+ , Tokens(std::move(tokens))
, Body(std::move(body))
{
#ifndef NDEBUG
@@ -90,14 +90,23 @@ namespace NKikimr::NBlobDepot {
Outcome = outcome;
Y_VERIFY(BackContext);
Context.SwitchTo(BackContext);
- if (Token.expired() || Finished) {
+ if (IsExpired() || Finished) {
throw TExDead();
}
}
private:
+ bool IsExpired() const {
+ for (auto& token : Tokens) {
+ if (token.expired()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
void DoRun() override {
- if (!Token.expired()) {
+ if (!IsExpired()) {
try {
Body();
} catch (const TExDead&) {
@@ -109,9 +118,9 @@ namespace NKikimr::NBlobDepot {
}
};
- TCoroTx::TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body)
+ TCoroTx::TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body)
: TTransactionBase(self)
- , Context(std::make_unique<TContext>(token, std::move(body)))
+ , Context(std::make_unique<TContext>(std::move(tokens), std::move(body)))
{}
TCoroTx::TCoroTx(TCoroTx& predecessor)
diff --git a/ydb/core/blob_depot/coro_tx.h b/ydb/core/blob_depot/coro_tx.h
index f731ff88dd1..14d892337aa 100644
--- a/ydb/core/blob_depot/coro_tx.h
+++ b/ydb/core/blob_depot/coro_tx.h
@@ -7,6 +7,8 @@ namespace NKikimr::NBlobDepot {
struct TExDead {};
+ using TTokens = std::vector<std::weak_ptr<TToken>>;
+
class TCoroTx : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
class TContext;
std::unique_ptr<TContext> Context;
@@ -14,7 +16,7 @@ namespace NKikimr::NBlobDepot {
static thread_local TCoroTx *Current;
public:
- TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body);
+ TCoroTx(TBlobDepot *self, TTokens&& tokens, std::function<void()> body);
TCoroTx(TCoroTx& predecessor);
~TCoroTx();
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
index fee51472b5a..4d469b25aa2 100644
--- a/ydb/core/blob_depot/data_decommit.cpp
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -12,6 +12,7 @@ namespace NKikimr::NBlobDepot {
TBlobDepot* const Self;
std::weak_ptr<TToken> Token;
+ std::shared_ptr<TToken> ActorToken = std::make_shared<TToken>();
std::vector<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
THashSet<TLogoBlobID> ResolutionErrors;
TEvBlobDepot::TEvResolve::TPtr Ev;
@@ -47,7 +48,7 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()),
(Sender, Ev->Sender), (Cookie, Ev->Cookie));
- Self->Execute(std::make_unique<TCoroTx>(Self, Token, std::bind(&TThis::TxPrepare, this)));
+ Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token, ActorToken}}, std::bind(&TThis::TxPrepare, this)));
++TxInFlight;
Become(&TThis::StateFunc);
}
@@ -79,22 +80,21 @@ namespace NKikimr::NBlobDepot {
// adjust minId to skip already assimilated items in range query
if (minId < Self->Data->LastAssimilatedBlobId) {
if (item.GetMustRestoreFirst()) {
- InvokeOtherActor(*this, &TThis::ScanRange, TKey(minId),
- TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN,
- true /*issueGets*/);
+ ScanRange(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId),
+ EScanFlags::INCLUDE_BEGIN, true /*issueGets*/);
}
minId = *Self->Data->LastAssimilatedBlobId;
}
// prepare the range first -- we must have it loaded in memory
- InvokeOtherActor(*this, &TThis::ScanRange, TKey(minId), TKey(maxId),
- EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, false /*issueGets*/);
+ ScanRange(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
+ false /*issueGets*/);
// issue scan query
- InvokeOtherActor(*this, &TThis::IssueRange, tabletId, minId, maxId, item.GetMustRestoreFirst());
+ IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst());
} else if (item.GetMustRestoreFirst()) {
- InvokeOtherActor(*this, &TThis::ScanRange, TKey(minId), TKey(maxId),
- EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, true /*issueGets*/);
+ ScanRange(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
+ true /*issueGets*/);
}
break;
@@ -109,7 +109,7 @@ namespace NKikimr::NBlobDepot {
const bool doGet = (!value && Self->Data->LastAssimilatedBlobId < key.GetBlobId()) // value not yet assimilated
|| (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet
if (doGet) {
- InvokeOtherActor(*this, &TThis::IssueGet, key.GetBlobId(), item.GetMustRestoreFirst());
+ IssueGet(key.GetBlobId(), item.GetMustRestoreFirst());
}
break;
}
@@ -309,7 +309,7 @@ namespace NKikimr::NBlobDepot {
(Cookie, Ev->Cookie), (ResolutionErrors.size, ResolutionErrors.size()),
(DecommitBlobs.size, DecommitBlobs.size()));
- Self->Execute(std::make_unique<TCoroTx>(Self, Token, [self = Self, decommitBlobs = std::move(DecommitBlobs),
+ Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Token}}, [self = Self, decommitBlobs = std::move(DecommitBlobs),
ev = Ev, resolutionErrors = std::move(ResolutionErrors)]() mutable {
ui32 numItemsProcessed = 0;
for (const auto& blob : decommitBlobs) {
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index 3165c168739..fa299aa8d0c 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
void TData::StartLoad() {
- Self->Execute(std::make_unique<TCoroTx>(Self, Self->Token, [&] {
+ Self->Execute(std::make_unique<TCoroTx>(Self, TTokens{{Self->Token}}, [&] {
bool progress = false;
TString trash;