diff options
author | alexvru <[email protected]> | 2023-02-02 17:58:14 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2023-02-02 17:58:14 +0300 |
commit | efcea1e96d8515b4fc6d7f5d67a13024f459bb1e (patch) | |
tree | 86ab5a598815790fc2122775bf30ffbbd0aae759 | |
parent | 18a5238144de7327c5e66ff10063e8f0b2863add (diff) |
Fix some BlobDepot issues
32 files changed, 319 insertions, 152 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index bbbe1416098..42e27b391f2 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -875,6 +875,8 @@ struct TEvBlobStorage { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_BLOBSTORAGE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_BLOBSTORAGE)"); + struct TExecutionRelay {}; + struct TEvPutResult; struct TEvGetResult; struct TEvBlockResult; @@ -914,6 +916,7 @@ struct TEvBlobStorage { mutable NLWTrace::TOrbit Orbit; ui32 RestartCounter = 0; std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvPut(const TLogoBlobID &id, TRcBuf &&buffer, TInstant deadline, NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog, @@ -986,6 +989,7 @@ struct TEvBlobStorage { TString ErrorReason; bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier? mutable NLWTrace::TOrbit Orbit; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags, ui32 groupId, float approximateFreeSpaceShare) @@ -1064,6 +1068,7 @@ struct TEvBlobStorage { ui32 RestartCounter = 0; bool PhantomCheck = false; bool Decommission = false; // is it generated by decommission actor and should be handled by the underlying proxy? + std::shared_ptr<TExecutionRelay> ExecutionRelay; struct TTabletData { TTabletData() = default; @@ -1211,6 +1216,7 @@ struct TEvBlobStorage { TString DebugInfo; TString ErrorReason; mutable NLWTrace::TOrbit Orbit; + std::shared_ptr<TExecutionRelay> ExecutionRelay; // to measure blobstorage->client hop TInstant Sent; @@ -1272,6 +1278,7 @@ struct TEvBlobStorage { const ui64 IssuerGuid = RandomNumber<ui64>() | 1; bool IsMonitored = true; ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvBlock(ui64 tabletId, ui32 generation, TInstant deadline) : TabletId(tabletId) @@ -1312,6 +1319,7 @@ struct TEvBlobStorage { struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> { NKikimrProto::EReplyStatus Status; TString ErrorReason; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvBlockResult(NKikimrProto::EReplyStatus status) : Status(status) @@ -1382,6 +1390,7 @@ struct TEvBlobStorage { const TInstant Deadline; mutable NLWTrace::TOrbit Orbit; ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvPatch(ui32 originalGroupId, const TLogoBlobID &originalId, const TLogoBlobID &patchedId, ui32 maskForCookieBruteForcing, TArrayHolder<TDiff> &&diffs, ui64 diffCount, TInstant deadline) @@ -1508,6 +1517,7 @@ struct TEvBlobStorage { const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained' TString ErrorReason; mutable NLWTrace::TOrbit Orbit; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags, ui32 groupId, float approximateFreeSpaceShare) @@ -1637,6 +1647,7 @@ struct TEvBlobStorage { const ui32 ForceBlockedGeneration; const bool FromLeader; ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvDiscover(ui64 tabletId, ui32 minGeneration, bool readBody, bool discoverBlockedGeneration, TInstant deadline, ui32 forceBlockedGeneration, bool fromLeader) @@ -1683,6 +1694,7 @@ struct TEvBlobStorage { TString Buffer; ui32 BlockedGeneration; TString ErrorReason; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvDiscoverResult(NKikimrProto::EReplyStatus status, ui32 minGeneration, ui32 blockedGeneration) : Status(status) @@ -1740,6 +1752,7 @@ struct TEvBlobStorage { ui32 ForceBlockedGeneration; ui32 RestartCounter = 0; bool Decommission = false; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvRange(ui64 tabletId, const TLogoBlobID &from, const TLogoBlobID &to, const bool mustRestoreFirst, TInstant deadline, bool isIndexOnly = false, ui32 forceBlockedGeneration = 0) @@ -1803,6 +1816,7 @@ struct TEvBlobStorage { TVector<TResponse> Responses; const ui32 GroupId; TString ErrorReason; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId) : Status(status) @@ -1863,6 +1877,7 @@ struct TEvBlobStorage { bool Decommission = false; ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvCollectGarbage(ui64 tabletId, ui32 recordGeneration, ui32 perGenerationCounter, ui32 channel, bool collect, ui32 collectGeneration, @@ -1972,6 +1987,7 @@ struct TEvBlobStorage { ui32 PerGenerationCounter; ui32 Channel; TString ErrorReason; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvCollectGarbageResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 recordGeneration, ui32 perGenerationCounter, ui32 channel) @@ -2005,6 +2021,7 @@ struct TEvBlobStorage { struct TEvStatus : public TEventLocal<TEvStatus, EvStatus> { const TInstant Deadline; ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvStatus(TInstant deadline) : Deadline(deadline) @@ -2035,6 +2052,7 @@ struct TEvBlobStorage { TStorageStatusFlags StatusFlags; float ApproximateFreeSpaceShare = 0.0f; // zero means absence of correct data TString ErrorReason; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvStatusResult(NKikimrProto::EReplyStatus status, TStorageStatusFlags statusFlags) : Status(status) @@ -2064,6 +2082,7 @@ struct TEvBlobStorage { std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo; std::optional<TLogoBlobID> SkipBlobsUpTo; ui32 RestartCounter = 0; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvAssimilate(std::optional<ui64> skipBlocksUpTo, std::optional<std::tuple<ui64, ui8>> skipBarriersUpTo, std::optional<TLogoBlobID> skipBlobsUpTo) @@ -2181,6 +2200,7 @@ struct TEvBlobStorage { std::deque<TBlock> Blocks; std::deque<TBarrier> Barriers; std::deque<TBlob> Blobs; + std::shared_ptr<TExecutionRelay> ExecutionRelay; TEvAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason = {}) : Status(status) diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index f9b9bb4ef0f..14b364cbebb 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -109,6 +109,9 @@ namespace NKikimr::NBlobDepot { protected: TBlobDepotAgent& Agent; + friend class TBlobDepotAgent; + std::set<std::weak_ptr<TEvBlobStorage::TExecutionRelay>, std::owner_less<std::weak_ptr<TEvBlobStorage::TExecutionRelay>>> SubrequestRelays; + public: using TResponse = std::variant< // internal events @@ -135,7 +138,8 @@ namespace NKikimr::NBlobDepot { TRequestSender(TBlobDepotAgent& agent); virtual ~TRequestSender(); void ClearRequestsInFlight(); - void OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response); + void OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay); protected: virtual void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) = 0; @@ -150,7 +154,7 @@ namespace NKikimr::NBlobDepot { , public TRequestSender { const ui32 VirtualGroupId; - const TActorId ProxyId; + TActorId ProxyId; const ui64 AgentInstanceId; ui64 TabletId = Max<ui64>(); TActorId PipeId; @@ -230,12 +234,14 @@ namespace NKikimr::NBlobDepot { } } if (!info->GetTotalVDisksNum()) { + // proxy finishes serving user requests TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0)); - return; + ProxyId = {}; } } - - TActivationContext::Send(ev->Forward(ProxyId)); + if (ProxyId) { + TActivationContext::Send(ev->Forward(ProxyId)); + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -249,7 +255,8 @@ namespace NKikimr::NBlobDepot { TRequestsInFlight OtherRequestInFlight; void RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, - TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet); + TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay = nullptr); template<typename TEvent> void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev); @@ -257,7 +264,8 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev); - void OnRequestComplete(ui64 id, TRequestSender::TResponse response, TRequestsInFlight& map); + void OnRequestComplete(ui64 id, TRequestSender::TResponse response, TRequestsInFlight& map, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay = nullptr); void DropTabletRequest(ui64 id); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -312,6 +320,7 @@ namespace NKikimr::NBlobDepot { std::multimap<TMonotonic, TQuery*>::iterator QueryWatchdogMapIter; NLog::EPriority WatchdogPriority = NLog::PRI_WARN; bool Destroyed = false; + std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay; static constexpr TDuration WatchdogDuration = TDuration::Seconds(10); @@ -365,7 +374,9 @@ namespace NKikimr::NBlobDepot { TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event) : TQuery(agent, std::move(event)) , Request(*Event->Get<TEvent>()) - {} + { + ExecutionRelay = std::move(Request.ExecutionRelay); + } protected: TEvent& Request; diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index a678898486d..2682683f4e6 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -134,7 +134,7 @@ namespace NKikimr::NBlobDepot { while (!TabletRequestInFlight.empty()) { auto node = TabletRequestInFlight.extract(TabletRequestInFlight.begin()); auto& requestInFlight = node.value(); - requestInFlight.Sender->OnRequestComplete(requestInFlight, TTabletDisconnected{}); + requestInFlight.Sender->OnRequestComplete(requestInFlight, TTabletDisconnected{}, nullptr); } for (auto& [_, kind] : ChannelKinds) { diff --git a/ydb/core/blob_depot/agent/proxy.cpp b/ydb/core/blob_depot/agent/proxy.cpp index a38f53e66ee..e472b3d7919 100644 --- a/ydb/core/blob_depot/agent/proxy.cpp +++ b/ydb/core/blob_depot/agent/proxy.cpp @@ -4,13 +4,51 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::SendToProxy(ui32 groupId, std::unique_ptr<IEventBase> event, TRequestSender *sender, TRequestContext::TPtr context) { + auto executionRelay = std::make_shared<TEvBlobStorage::TExecutionRelay>(); + + switch (event->Type()) { + case TEvBlobStorage::EvPut: + static_cast<TEvBlobStorage::TEvPut&>(*event).ExecutionRelay = executionRelay; + break; + + case TEvBlobStorage::EvGet: + static_cast<TEvBlobStorage::TEvGet&>(*event).ExecutionRelay = executionRelay; + break; + } + const ui64 id = NextOtherRequestId++; - if (groupId == DecommitGroupId) { + auto getQueryId = [&] { + auto *p = dynamic_cast<TQuery*>(sender); + return p ? std::make_optional(p->GetQueryId()) : std::nullopt; + }; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA46, "SendToProxy", (AgentId, LogId), (QueryId, getQueryId()), + (GroupId, groupId), (DecommitGroupId, DecommitGroupId), (Type, event->Type()), (Cookie, id)); + if (groupId != DecommitGroupId) { + SendToBSProxy(SelfId(), groupId, event.release(), id); + } else if (ProxyId) { Send(ProxyId, event.release(), 0, id); } else { - SendToBSProxy(SelfId(), groupId, event.release(), id); + std::unique_ptr<IEventBase> response; + switch (const ui32 type = event->Type()) { + case TEvBlobStorage::EvGet: { + response = static_cast<TEvBlobStorage::TEvGet&>(*event).MakeErrorResponse(NKikimrProto::OK, + "proxy has vanished", groupId); + event.reset(); // drop origin event to prevent extra refcounts on ExecutionRelay + auto& r = static_cast<TEvBlobStorage::TEvGetResult&>(*response); + for (size_t i = 0; i < r.ResponseSz; ++i) { + r.Responses[i].Status = NKikimrProto::NODATA; + } + r.ExecutionRelay = executionRelay; + break; + } + + default: + Y_FAIL("unexpected request type for decommission proxy Type# 0x%08" PRIx32, type); + } + Send(SelfId(), response.release(), 0, id); } - RegisterRequest(id, sender, std::move(context), {}, false); + + RegisterRequest(id, sender, std::move(context), {}, false, std::move(executionRelay)); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 495d9f09f45..947ea8b027c 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -159,6 +159,10 @@ namespace NKikimr::NBlobDepot { auto nh = Agent.QueryWatchdogMap.extract(QueryWatchdogMapIter); nh.key() = now + WatchdogDuration; QueryWatchdogMapIter = Agent.QueryWatchdogMap.insert(std::move(nh)); + for (const auto& cookie : SubrequestRelays) { + Y_VERIFY_S(!cookie.expired(), "AgentId# " << Agent.LogId << " QueryId# " << GetQueryId() + << " subrequest got stuck"); + } } void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { @@ -171,7 +175,9 @@ namespace NKikimr::NBlobDepot { #define XX(TYPE) \ case TEvBlobStorage::TYPE: \ response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, Agent.VirtualGroupId); \ - break; + static_cast<TEvBlobStorage::T##TYPE##Result&>(*response).ExecutionRelay = std::move(ExecutionRelay); \ + break; \ + // ENUMERATE_INCOMING_EVENTS(XX) #undef XX @@ -185,6 +191,16 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime)); + switch (response->Type()) { +#define XX(TYPE) \ + case TEvBlobStorage::TYPE##Result: \ + static_cast<TEvBlobStorage::T##TYPE##Result&>(*response).ExecutionRelay = std::move(ExecutionRelay); \ + break; \ + // + + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + } Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); OnDestroy(true); DoDestroy(); diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index d367c21b56e..1f9816379e9 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -2,11 +2,15 @@ namespace NKikimr::NBlobDepot { - struct TBlobDepotAgent::TQuery::TReadContext : TRequestContext { + struct TBlobDepotAgent::TQuery::TReadContext + : TRequestContext + , std::enable_shared_from_this<TReadContext> + { TReadArg ReadArg; const ui64 Size; TString Buffer; bool Terminated = false; + bool StopProcessingParts = false; ui32 NumPartsPending = 0; TLogoBlobID BlobWithoutData; @@ -15,17 +19,27 @@ namespace NKikimr::NBlobDepot { , Size(size) {} - void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) { - query->OnRead(ReadArg.Tag, status, errorReason); - Terminated = true; - } - void Abort() { Terminated = true; } void EndWithSuccess(TQuery *query) { + Y_VERIFY(!Terminated); query->OnRead(ReadArg.Tag, NKikimrProto::OK, std::move(Buffer)); + Abort(); + } + + void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) { + Y_VERIFY(!Terminated); + Y_VERIFY(status != NKikimrProto::NODATA && status != NKikimrProto::OK); + query->OnRead(ReadArg.Tag, status, errorReason); + Abort(); + } + + void EndWithNoData(TQuery *query) { + Y_VERIFY(!Terminated); + query->OnRead(ReadArg.Tag, NKikimrProto::NODATA, {}); + Abort(); } ui64 GetTag() const { @@ -44,7 +58,8 @@ namespace NKikimr::NBlobDepot { bool TBlobDepotAgent::TQuery::IssueRead(TReadArg&& arg, TString& error) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value)); + (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size), + (Value, arg.Value)); ui64 outputOffset = 0; @@ -69,7 +84,8 @@ namespace NKikimr::NBlobDepot { if (end <= begin || blobId.BlobSize() < end) { error = "incorrect SubrangeBegin/SubrangeEnd pair"; STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value)); + (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size), + (Value, arg.Value)); return false; } @@ -101,7 +117,8 @@ namespace NKikimr::NBlobDepot { if (size) { error = "incorrect offset/size provided"; STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value)); + (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size), + (Value, arg.Value)); return false; } @@ -128,14 +145,10 @@ namespace NKikimr::NBlobDepot { partContext->Offsets.push_back(outputOffset); } - // when the TEvGet query is sent to the underlying proxy, MustRestoreFirst must be cleared, or else it may - // lead to ERROR due to impossibility of writes; all MustRestoreFirst should be handled by the TEvResolve - // query - auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), context->ReadArg.GetHandleClass, - context->ReadArg.MustRestoreFirst && groupId != Agent.DecommitGroupId); + auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), context->ReadArg.GetHandleClass); event->ReaderTabletData = context->ReadArg.ReaderTabletData; STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA39, "issuing TEvGet", (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (ReadId, context->GetTag()), (GroupId, groupId), (Msg, *event)); + (ReadId, context->GetTag()), (Key, Agent.PrettyKey(context->ReadArg.Key)), (GroupId, groupId), (Msg, *event)); Agent.SendToProxy(groupId, std::move(event), this, std::move(partContext)); ++context->NumPartsPending; } @@ -148,57 +161,60 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) { auto& partContext = context->Obtain<TReadContext::TPartContext>(); auto& readContext = *partContext.Read; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA41, "HandleGetResult", (AgentId, Agent.LogId), (ReadId, readContext.GetTag()), - (Msg, msg)); - if (readContext.Terminated) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA41, "HandleGetResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), (Msg, msg), + (Terminated, readContext.Terminated)); + if (readContext.Terminated || readContext.StopProcessingParts) { return; // just ignore this read } - if (msg.Status != NKikimrProto::OK) { - readContext.EndWithError(this, msg.Status, std::move(msg.ErrorReason)); - } else { - Y_VERIFY(msg.ResponseSz == partContext.Offsets.size()); - - for (ui32 i = 0; i < msg.ResponseSz; ++i) { - auto& blob = msg.Responses[i]; - if (blob.Status == NKikimrProto::NODATA) { - NKikimrBlobDepot::TEvResolve resolve; - auto *item = resolve.AddItems(); - item->SetExactKey(readContext.ReadArg.Key); - Agent.Issue(std::move(resolve), this, partContext.Read); - readContext.Abort(); - readContext.BlobWithoutData = blob.Id; - return; - } else if (blob.Status != NKikimrProto::OK) { - return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id); - } + Y_VERIFY(msg.ResponseSz == partContext.Offsets.size()); + + for (ui32 i = 0; i < msg.ResponseSz; ++i) { + auto& blob = msg.Responses[i]; + if (blob.Status == NKikimrProto::NODATA) { + NKikimrBlobDepot::TEvResolve resolve; + auto *item = resolve.AddItems(); + item->SetExactKey(readContext.ReadArg.Key); + item->SetMustRestoreFirst(readContext.ReadArg.MustRestoreFirst); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAxx, "issuing extra resolve", (Agent, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), (Msg, resolve)); + Agent.Issue(std::move(resolve), this, readContext.shared_from_this()); + readContext.StopProcessingParts = true; + readContext.BlobWithoutData = blob.Id; + return; + } else if (blob.Status != NKikimrProto::OK) { + return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id); + } - auto& buffer = readContext.Buffer; - const ui64 offset = partContext.Offsets[i]; + auto& buffer = readContext.Buffer; + const ui64 offset = partContext.Offsets[i]; - Y_VERIFY(offset < readContext.Size && blob.Buffer.size() <= readContext.Size - offset); + Y_VERIFY(offset < readContext.Size && blob.Buffer.size() <= readContext.Size - offset); - if (!buffer && !offset) { - buffer = std::move(blob.Buffer); - buffer.resize(readContext.Size); - } else { - if (!buffer) { - buffer = TString::Uninitialized(readContext.Size); - } - memcpy(buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size()); + if (!buffer && !offset) { + buffer = std::move(blob.Buffer); + buffer.resize(readContext.Size); + } else { + if (!buffer) { + buffer = TString::Uninitialized(readContext.Size); } + memcpy(buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size()); } + } - if (!--readContext.NumPartsPending) { - readContext.EndWithSuccess(this); - } + if (!--readContext.NumPartsPending) { + readContext.EndWithSuccess(this); } } void TBlobDepotAgent::TQuery::HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg) { auto& readContext = context->Obtain<TReadContext>(); + if (readContext.Terminated) { + return; + } STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA42, "HandleResolveResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), - (ReadId, readContext.GetTag()), (Msg, msg.Record)); + (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), (Msg, msg.Record)); if (msg.Record.GetStatus() != NKikimrProto::OK) { readContext.EndWithError(this, msg.Record.GetStatus(), msg.Record.GetErrorReason()); } else if (msg.Record.ResolvedKeysSize() == 1) { @@ -210,18 +226,19 @@ namespace NKikimr::NBlobDepot { readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to restart read Error# " << error); } } else if (!item.GetReliablyWritten()) { // this was unassimilated value and we got NODATA for it - readContext.EndWithError(this, NKikimrProto::NODATA, {}); + readContext.EndWithNoData(this); } else { - Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " QueryId# " << GetQueryId() + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to read blob: data seems to be lost", (AgentId, Agent.LogId), + (QueryId, GetQueryId()), (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), + (BlobId, readContext.BlobWithoutData)); + Y_VERIFY_DEBUG_S(false, "data seems to be lost AgentId# " << Agent.LogId << " QueryId# " << GetQueryId() << " ReadId# " << readContext.GetTag() << " BlobId# " << readContext.BlobWithoutData); - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to read blob -- data is lost", (AgentId, Agent.LogId), - (QueryId, GetQueryId()), (ReadId, readContext.GetTag()), (BlobId, readContext.BlobWithoutData)); readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to read BlobId# " - << readContext.BlobWithoutData << ": data is lost"); + << readContext.BlobWithoutData << ": data seems to be lost"); } } else { Y_VERIFY(!msg.Record.ResolvedKeysSize()); - readContext.EndWithError(this, NKikimrProto::NODATA, {}); + readContext.EndWithNoData(this); } } diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index dc830f5cecd..621d5be545d 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -36,7 +36,12 @@ namespace NKikimr::NBlobDepot { }); } - void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response) { + void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) { + if (executionRelay) { + const size_t num = SubrequestRelays.erase(executionRelay); + Y_VERIFY(num); + } requestInFlight.Unlink(); ProcessResponse(requestInFlight.Id, std::move(requestInFlight.Context), std::move(response)); } @@ -63,11 +68,15 @@ namespace NKikimr::NBlobDepot { // TBlobDepotAgent machinery void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context, - TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet) { + TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) { TRequestsInFlight& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight; const bool inserted = map.emplace(id, sender, std::move(context), std::move(cancelCallback), toBlobDepotTablet).second; Y_VERIFY(inserted); + if (executionRelay) { + sender->SubrequestRelays.emplace(executionRelay); + } } template<typename TEvent> @@ -91,18 +100,20 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId), - (Id, ev->Cookie), (Type, TypeName<TEvent>())); - OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId), (Id, ev->Cookie), + (Type, TypeName<TEvent>()), (Msg, *ev->Get())); + Y_VERIFY(ev->Get()->ExecutionRelay); + OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight, std::move(ev->Get()->ExecutionRelay)); } template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev); template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev); - void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) { + void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) { if (auto node = map.extract(id)) { auto& requestInFlight = node.value(); - requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response)); + requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response), std::move(executionRelay)); } } diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 4e97c34738b..697dfb2d495 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -103,7 +103,7 @@ namespace NKikimr::NBlobDepot { } void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (AgentId, Agent.LogId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0), (ErrorReason, status != NKikimrProto::OK ? buffer : "")); diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 70e50a56f17..d9a2b049f6b 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -11,7 +11,6 @@ namespace NKikimr::NBlobDepot { std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response; ui32 ReadsInFlight = 0; - ui32 ResolvesInFlight = 0; std::map<TLogoBlobID, TString> FoundBlobs; std::vector<TRead> Reads; bool Reverse = false; @@ -27,10 +26,7 @@ namespace NKikimr::NBlobDepot { Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, Request.From, Request.To, Agent.VirtualGroupId); - IssueResolve(); - } - - void IssueResolve() { + // issue resolve query TString from = Request.From.AsBinaryString(); TString to = Request.To.AsBinaryString(); Reverse = Request.To < Request.From; @@ -50,18 +46,6 @@ namespace NKikimr::NBlobDepot { item->SetMustRestoreFirst(Request.MustRestoreFirst); Agent.Issue(std::move(resolve), this, nullptr); - ++ResolvesInFlight; - } - - void IssueResolve(ui64 tag) { - NKikimrBlobDepot::TEvResolve resolve; - auto *item = resolve.AddItems(); - item->SetExactKey(Reads[tag].Id.AsBinaryString()); - item->SetMustRestoreFirst(Request.MustRestoreFirst); - item->SetCookie(tag); - - Agent.Issue(std::move(resolve), this, nullptr); - ++ResolvesInFlight; } void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override { @@ -81,7 +65,8 @@ namespace NKikimr::NBlobDepot { } void HandleResolveResult(ui64 id, TRequestContext::TPtr context, NKikimrBlobDepot::TEvResolveResult& msg) { - --ResolvesInFlight; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA47, "HandleResolveResult", (AgentId, Agent.LogId), + (QueryId, GetQueryId()), (Msg, msg)); if (msg.GetStatus() != NKikimrProto::OK && msg.GetStatus() != NKikimrProto::OVERRUN) { return EndWithError(msg.GetStatus(), msg.GetErrorReason()); @@ -139,7 +124,8 @@ namespace NKikimr::NBlobDepot { case NKikimrProto::OK: { Y_VERIFY(dataOrErrorReason.size() == read.Id.BlobSize()); const bool inserted = FoundBlobs.try_emplace(read.Id, std::move(dataOrErrorReason)).second; - Y_VERIFY(inserted); + Y_VERIFY_S(inserted, "AgentId# " << Agent.LogId << " QueryId# " << GetQueryId() + << " duplicate BlobId# " << read.Id << " received"); break; } @@ -157,7 +143,7 @@ namespace NKikimr::NBlobDepot { } void CheckAndFinish() { - if (!ReadsInFlight && !ResolvesInFlight && !Finished) { + if (!ReadsInFlight && !Finished) { for (auto& [id, buffer] : FoundBlobs) { Y_VERIFY_S(buffer.size() == Request.IsIndexOnly ? 0 : id.BlobSize(), "Id# " << id << " Buffer.size# " << buffer.size()); Response->Responses.emplace_back(id, std::move(buffer)); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 0ce533696a6..5251191bd47 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -454,7 +454,7 @@ namespace NKikimr::NBlobDepot { }; STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.Connection->NodeId), - (Id, ev->Cookie), (Channel, channelIndex), (InvalidatedStep, invalidatedStep), + (Id, ev->Cookie), (Channel, int(channelIndex)), (InvalidatedStep, invalidatedStep), (GivenIdRanges, channel.GivenIdRanges), (Agent.GivenIdRanges, agent.GivenIdRanges[channelIndex]), (WritesInFlight, makeWritesInFlight())); diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 7231cc8226c..463b6979c7c 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -379,6 +379,9 @@ namespace NKikimr::NBlobDepot { ui64 MaxKeys = 0; ui32 PrechargeRows = 0; ui64 PrechargeBytes = 0; +#ifndef NDEBUG + std::set<TKey> KeysInRange = {}; // runtime state +#endif }; private: @@ -490,9 +493,8 @@ namespace NKikimr::NBlobDepot { } while (rowset.IsValid()) { TKey key = TKey::FromBinaryKey(rowset.GetKey(), Data->Self->Config); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "ScanRange.Load", (Id, Data->Self->GetLogId()), (Left, left), + STLOG(PRI_TRACE, BLOB_DEPOT, BDT46, "ScanRange.Load", (Id, Data->Self->GetLogId()), (Left, left), (Right, right), (Key, key)); - LastProcessedKey.emplace(key); if (left < key && key < right) { TValue* const value = Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(), rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>()); @@ -504,18 +506,16 @@ namespace NKikimr::NBlobDepot { Processing = Range.Flags & EScanFlags::INCLUDE_END ? key <= Range.End : key < Range.End; } } - Processing = Processing && callback(std::move(key), *value); + Processing = Processing && callback(key, *value); *Progress = true; } else { Y_VERIFY_DEBUG(key == left || key == right); } + LastProcessedKey.emplace(std::move(key)); if (!rowset.Next()) { return false; // we break iteration anyway, because we can't read more data } } - if (!LastProcessedKey || (Range.Flags & EScanFlags::REVERSE ? left < *LastProcessedKey : *LastProcessedKey < right)) { - LastProcessedKey.emplace(Range.Flags & EScanFlags::REVERSE ? left : right); - } return Processing; }; }; @@ -526,7 +526,7 @@ namespace NKikimr::NBlobDepot { template<typename TCallback> bool ScanRange(TScanRange& range, NTabletFlatExecutor::TTransactionContext *txc, bool *progress, TCallback&& callback) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "ScanRange", (Id, Self->GetLogId()), (Begin, range.Begin), (End, range.End), + STLOG(PRI_TRACE, BLOB_DEPOT, BDT76, "ScanRange", (Id, Self->GetLogId()), (Begin, range.Begin), (End, range.End), (Flags, range.Flags), (MaxKeys, range.MaxKeys)); const bool reverse = range.Flags & EScanFlags::REVERSE; @@ -534,11 +534,13 @@ namespace NKikimr::NBlobDepot { bool res = true; - auto issue = [&](TKey&& key, const TValue& value) { + auto issue = [&](const TKey& key, const TValue& value) { Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_BEGIN ? range.Begin <= key : range.Begin < key); Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_END ? key <= range.End : key < range.End); - - if (!callback(key, value) || (range.MaxKeys && !--range.MaxKeys)) { +#ifndef NDEBUG + Y_VERIFY(range.KeysInRange.insert(key).second); // ensure that the generated key is unique +#endif + if (!callback(key, value) || !--range.MaxKeys) { return false; // scan aborted by user or finished scanning the required range } else { // remove already scanned items from the range query @@ -549,7 +551,7 @@ namespace NKikimr::NBlobDepot { const auto& from = reverse ? TKey::Min() : range.Begin; const auto& to = reverse ? range.End : TKey::Max(); LoadedKeys.EnumInRange(from, to, reverse, [&](const TKey& left, const TKey& right, bool isRangeLoaded) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "ScanRange.Step", (Id, Self->GetLogId()), (Left, left), (Right, right), + STLOG(PRI_TRACE, BLOB_DEPOT, BDT83, "ScanRange.Step", (Id, Self->GetLogId()), (Left, left), (Right, right), (IsRangeLoaded, isRangeLoaded), (From, from), (To, to)); if (!isRangeLoaded) { // we have to load range (left, right), not including both ends @@ -565,7 +567,7 @@ namespace NKikimr::NBlobDepot { break; } else if (range.Flags & EScanFlags::INCLUDE_BEGIN ? key < range.Begin : key <= range.Begin) { return false; // just left the left side of the range - } else if ((key != range.End || range.Flags & EScanFlags::INCLUDE_END) && !issue(TKey(key), value)) { + } else if ((key != range.End || range.Flags & EScanFlags::INCLUDE_END) && !issue(key, value)) { return false; // enough keys processed } } @@ -576,11 +578,15 @@ namespace NKikimr::NBlobDepot { const auto& [key, value] = *it; if (range.Flags & EScanFlags::INCLUDE_END ? range.End < key : range.End <= key) { return false; // just left the right side of the range - } else if ((key != range.Begin || range.Flags & EScanFlags::INCLUDE_BEGIN) && !issue(TKey(key), value)) { + } else if ((key != range.Begin || range.Flags & EScanFlags::INCLUDE_BEGIN) && !issue(key, value)) { return false; // enough keys processed } } } + if (!loader.LastProcessedKey || (reverse & EScanFlags::REVERSE ? left < *loader.LastProcessedKey : + *loader.LastProcessedKey < right)) { + loader.LastProcessedKey.emplace(reverse ? left : right); + } return true; }); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index ab8820c93b2..c40dc750612 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -131,6 +131,33 @@ NActors::NLog::EPriority PriorityForStatusOutbound(NKikimrProto::EReplyStatus st NActors::NLog::EPriority PriorityForStatusResult(NKikimrProto::EReplyStatus status); NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus status); +inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) { + switch (const ui32 type = ev.Type()) { +#define XX(T) \ + case TEvBlobStorage::Ev##T: \ + static_cast<TEvBlobStorage::TEv##T&>(ev).ExecutionRelay = std::move(executionRelay); \ + break; \ + case TEvBlobStorage::Ev##T##Result: \ + static_cast<TEvBlobStorage::TEv##T##Result&>(ev).ExecutionRelay = std::move(executionRelay); \ + break; \ + // + + XX(Put) + XX(Get) + XX(Block) + XX(Discover) + XX(Range) + XX(CollectGarbage) + XX(Status) + XX(Patch) + XX(Assimilate) +#undef XX + + default: + Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); + } +} + template<typename TDerived> class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor<TDerived>> { public: @@ -141,7 +168,8 @@ public: TBlobStorageGroupRequestActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TIntrusivePtr<TGroupQueues> groupQueues, TIntrusivePtr<TBlobStorageGroupProxyMon> mon, const TActorId& source, ui64 cookie, NWilson::TTraceId traceId, NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled, TMaybe<TGroupStat::EKind> latencyQueueKind, - TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name) + TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) : TActor<TBlobStorageGroupRequestActor<TDerived>>(&TThis::InitialStateFunc, TDerived::ActorActivityType()) , Info(std::move(info)) , GroupQueues(std::move(groupQueues)) @@ -155,6 +183,7 @@ public: , LatencyQueueKind(latencyQueueKind) , RequestStartTime(now) , RacingDomains(&Info->GetTopology()) + , ExecutionRelay(std::move(executionRelay)) { TDerived::ActiveCounter(Mon)->Inc(); Span @@ -272,6 +301,9 @@ public: // make NodeWarden restart the query just after proxy reconfiguration Y_VERIFY_DEBUG(RestartCounter < 100); auto q = self.RestartQuery(RestartCounter + 1); + if (q->Type() != TEvBlobStorage::EvBunchOfEvents) { + SetExecutionRelay(*q, std::exchange(ExecutionRelay, {})); + } ++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)]; const TActorId& proxyId = MakeBlobStorageProxyID(Info->GroupID); TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span.GetTraceId())); @@ -414,6 +446,9 @@ public: } void PassAway() override { + // ensure we didn't keep execution relay on occasion + Y_VERIFY_DEBUG_S(!ExecutionRelay, LogCtx.RequestPrefix << " actor died without properly sending response"); + // ensure that we are dying for the first time Y_VERIFY(!std::exchange(Dead, true)); TDerived::ActiveCounter(Mon)->Dec(); @@ -452,6 +487,13 @@ public: #undef XX } + if (ExecutionRelay) { + SetExecutionRelay(*ev, std::exchange(ExecutionRelay, {})); + ExecutionRelayUsed = true; + } else { + Y_VERIFY(!ExecutionRelayUsed); + } + // ensure that we are dying for the first time Y_VERIFY(!Dead); if (RequestHandleClass && PoolCounters) { @@ -543,6 +585,8 @@ private: std::deque<std::unique_ptr<IEventHandle>> PostponedQ; TBlobStorageGroupInfo::TGroupFailDomains RacingDomains; // a set of domains we've received RACE from TActorId ProxyActorId; + std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay; + bool ExecutionRelayUsed = false; }; void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp index 15138025e56..3e76e63db80 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp @@ -268,7 +268,7 @@ public: NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_ASSIMILATE, false, {}, now, storagePoolCounters, ev->RestartCounter, - "DSProxy.Assimilate") + "DSProxy.Assimilate", std::move(ev->ExecutionRelay)) , SkipBlocksUpTo(ev->SkipBlocksUpTo) , SkipBarriersUpTo(ev->SkipBarriersUpTo) , SkipBlobsUpTo(ev->SkipBlobsUpTo) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp index f381d505a7d..117b9f65886 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp @@ -139,7 +139,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter, - "DSProxy.Block") + "DSProxy.Block", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , Generation(ev->Generation) , Deadline(ev->Deadline) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp index d5875e70955..de0d07e30c5 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp @@ -111,7 +111,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc auto result = std::make_unique<TEvBlobStorage::TEvCollectGarbageResult>(status, TabletId, RecordGeneration, PerGenerationCounter, Channel); result->ErrorReason = ErrorReason; - A_LOG_LOG_S(true, PriorityForStatusOutbound(status), "DSPC02", "Result# " << result->Print(false)); + A_LOG_LOG_S(true, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "DSPC02", "Result# " << result->Print(false)); SendResponseAndDie(std::move(result)); } @@ -147,7 +147,7 @@ public: NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter, - "DSProxy.CollectGarbage") + "DSProxy.CollectGarbage", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , RecordGeneration(ev->RecordGeneration) , PerGenerationCounter(ev->PerGenerationCounter) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp index b9201f16922..ca296d76ad3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp @@ -886,7 +886,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter, - "DSProxy.Discover") + "DSProxy.Discover", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , MinGeneration(ev->MinGeneration) , ReadBody(ev->ReadBody) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp index 9a5c04380ef..31ceba6c386 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp @@ -464,7 +464,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.Discover(mirror-3-dc)") + ev->RestartCounter, "DSProxy.Discover(mirror-3-dc)", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , MinGeneration(ev->MinGeneration) , StartTime(now) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp index 2b350fe8a6c..4e7ff9c2ce7 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp @@ -36,7 +36,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.Discover(mirror-3of4)") + ev->RestartCounter, "DSProxy.Discover(mirror-3of4)", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , MinGeneration(ev->MinGeneration) , StartTime(now) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp index 0b89004dd24..bd5ed008eab 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp @@ -465,6 +465,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvGet, requestSize, duration.SecondsFloat() * 1000.0, tabletId, evResult->GroupId, channel, NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()), success); + A_LOG_LOG_S(true, success ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPG68", "Result# " << evResult->Print(false)); return SendResponseAndDie(std::unique_ptr<TEvBlobStorage::TEvGetResult>(evResult.Release())); } @@ -493,7 +494,8 @@ public: TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo, - latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get") + latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get", + std::move(ev->ExecutionRelay)) , GetImpl(info, state, ev, std::move(nodeLayout), LogCtx.RequestPrefix) , Orbit(std::move(ev->Orbit)) , Deadline(ev->Deadline) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h index 5a80f045d55..81198774537 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h @@ -124,14 +124,15 @@ public: TString DumpQuery() const { TStringStream str; - str << "{"; + str << '{'; + str << "MustRestoreFirst# " << MustRestoreFirst; for (ui32 i = 0; i < QuerySize; ++i) { - str << (i ? " " : "") + str << ' ' << Queries[i].Id - << "@" << Queries[i].Shift - << ":" << Queries[i].Size; + << '@' << Queries[i].Shift + << ':' << Queries[i].Size; } - str << "}"; + str << '}'; return str.Str(); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index 47d60411f30..8e46ae6c369 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -285,6 +285,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> ? NKikimrProto::ERROR : NKikimrProto::NO_GROUP; auto response = ev->Get()->MakeErrorResponse(status, ErrorDescription, GroupId); + SetExecutionRelay(*response, std::move(ev->Get()->ExecutionRelay)); NActors::NLog::EPriority priority = CheckPriorityForErrorState(); LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::BS_PROXY, ExtraLogInfo << "Group# " << GroupId << " HandleError ev# " << ev->Get()->Print(false) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp index 82499e11089..c15adde2177 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp @@ -35,7 +35,7 @@ class TBlobStorageGroupIndexRestoreGetRequest THashMap<TLogoBlobID, std::pair<bool, bool>> KeepFlags; void ReplyAndDie(NKikimrProto::EReplyStatus status) { - A_LOG_DEBUG_S("DSPI14", "ReplyAndDie" + A_LOG_INFO_S("DSPI14", "ReplyAndDie" << " Reply with status# " << NKikimrProto::EReplyStatus_Name(status) << " PendingResult# " << (PendingResult ? PendingResult->ToString().data() : "nullptr")); if (status != NKikimrProto::OK) { @@ -270,7 +270,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_INDEXRESTOREGET, false, latencyQueueKind, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.IndexRestoreGet") + ev->RestartCounter, "DSProxy.IndexRestoreGet", std::move(ev->ExecutionRelay)) , QuerySize(ev->QuerySize) , Queries(ev->Queries.Release()) , Deadline(ev->Deadline) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp index 28d623e7d77..e16fb06f16f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp @@ -97,7 +97,7 @@ public: NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_MULTICOLLECT, false, {}, now, storagePoolCounters, 0, - "DSProxy.MultiCollect") + "DSProxy.MultiCollect", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , RecordGeneration(ev->RecordGeneration) , PerGenerationCounter(ev->PerGenerationCounter) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp index e5d734c9d78..eba8d52a259 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp @@ -97,7 +97,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_MULTIGET, false, latencyQueueKind, now, storagePoolCounters, 0, - "DSProxy.MultiGet") + "DSProxy.MultiGet", std::move(ev->ExecutionRelay)) , QuerySize(ev->QuerySize) , Queries(ev->Queries.Release()) , Deadline(ev->Deadline) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index b067f4a5f65..8950f18d7d3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -108,7 +108,7 @@ public: bool useVPatch = false) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_PATCH, false, {}, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.Patch") + ev->RestartCounter, "DSProxy.Patch", std::move(ev->ExecutionRelay)) , OriginalGroupId(ev->OriginalGroupId) , OriginalId(ev->OriginalId) , PatchedId(ev->PatchedId) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 5c682482e68..a9ce350f309 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -329,7 +329,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> &putResult, ui64 blobIdx) { NKikimrProto::EReplyStatus status = putResult->Status; - A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_NOTICE, "BPP21", + A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21", "SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent << " PutImpl.Blobs.size# " << PutImpl.Blobs.size() << " Last# " << (ResponsesSent + 1 == PutImpl.Blobs.size() ? "true" : "false")); @@ -353,6 +353,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt LWTRACK(DSProxyPutReply, PutImpl.Blobs[blobIdx].Orbit); putResult->Orbit = std::move(PutImpl.Blobs[blobIdx].Orbit); putResult->WrittenBeyondBarrier = PutImpl.WrittenBeyondBarrier[blobIdx]; + putResult->ExecutionRelay = std::move(PutImpl.Blobs[blobIdx].ExecutionRelay); if (!IsManyPuts) { SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr); } else { @@ -402,6 +403,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt item.Span.GetTraceId() )); put->RestartCounter = counter; + put->ExecutionRelay = std::move(item.ExecutionRelay); } return ev; } @@ -429,7 +431,7 @@ public: bool enableRequestMod3x3ForMinLatecy) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.Put") + ev->RestartCounter, "DSProxy.Put", nullptr) , PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy, source, cookie, Span.GetTraceId()) , WaitingVDiskResponseCount(info->GetTotalVDisksNum()) , Deadline(ev->Deadline) @@ -472,7 +474,7 @@ public: bool enableRequestMod3x3ForMinLatecy) : TBlobStorageGroupRequestActor(info, state, mon, TActorId(), 0, NWilson::TTraceId(), NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters, - MaxRestartCounter(events), "DSProxy.Put") + MaxRestartCounter(events), "DSProxy.Put", nullptr) , PutImpl(info, state, events, mon, handleClass, tactic, enableRequestMod3x3ForMinLatecy) , WaitingVDiskResponseCount(info->GetTotalVDisksNum()) , IsManyPuts(true) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 2695ea1ea83..52d22ffec5e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -55,9 +55,11 @@ private: bool Replied = false; std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; NWilson::TSpan Span; + std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay; TBlobInfo(TLogoBlobID id, TRope&& buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId, - NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single) + NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single, + std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) : BlobId(id) , Buffer(std::move(buffer)) , BufferSize(Buffer.size()) @@ -66,6 +68,7 @@ private: , Orbit(std::move(orbit)) , ExtraBlockChecks(std::move(extraBlockChecks)) , Span(single ? NWilson::TSpan() : NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Put.Blob")) + , ExecutionRelay(std::move(executionRelay)) {} void Output(IOutputStream& s) const { @@ -118,7 +121,7 @@ public: , Tactic(ev->Tactic) { Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit), - std::move(ev->ExtraBlockChecks), true); + std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay)); auto& blob = Blobs.back(); LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic, @@ -147,7 +150,7 @@ public: Y_VERIFY(msg.HandleClass == putHandleClass); Y_VERIFY(msg.Tactic == tactic); Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId), - std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false); + std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay)); Deadline = Max(Deadline, msg.Deadline); auto& blob = Blobs.back(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index fbcf7312169..d1783af11f8 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -299,7 +299,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob if (To < From) { std::reverse(result->Responses.begin(), result->Responses.end()); } - A_LOG_LOG_S(true, PriorityForStatusOutbound(status), "DSR05", "Result# " << result->Print(false)); + A_LOG_LOG_S(true, NLog::PRI_INFO, "DSR05", "Result# " << result->Print(false)); SendReply(result); } @@ -307,7 +307,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob std::unique_ptr<TEvBlobStorage::TEvRangeResult> result(new TEvBlobStorage::TEvRangeResult( status, From, To, Info->GroupID)); result->ErrorReason = ErrorReason; - A_LOG_LOG_S(true, PriorityForStatusOutbound(status), "DSR06", "Result# " << result->Print(false)); + A_LOG_LOG_S(true, NLog::PRI_NOTICE, "DSR06", "Result# " << result->Print(false)); SendReply(result); } @@ -342,7 +342,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_RANGE, false, {}, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.Range") + ev->RestartCounter, "DSProxy.Range", std::move(ev->ExecutionRelay)) , TabletId(ev->TabletId) , From(ev->From) , To(ev->To) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 87b4d72b5df..6a622b7b7a2 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -91,6 +91,7 @@ namespace NKikimr { std::unique_ptr<TEvBlobStorage::TEvPutResult> result( new TEvBlobStorage::TEvPutResult(NKikimrProto::ERROR, ev->Get()->Id, 0, GroupId, 0.f)); result->ErrorReason = str.Str(); + result->ExecutionRelay = std::move(ev->Get()->ExecutionRelay); LOG_ERROR_S(*TlsActivationContext, NKikimrServices::BS_PROXY, "HandleNormal ev# " << ev->Get()->Print(false) << " result# " << result->Print(false) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp index dfbe247be85..8ddf88414f2 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp @@ -90,7 +90,7 @@ public: ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_STATUS, false, {}, now, storagePoolCounters, - ev->RestartCounter, "DSProxy.Status") + ev->RestartCounter, "DSProxy.Status", std::move(ev->ExecutionRelay)) , Deadline(ev->Deadline) , Requests(0) , Responses(0) diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp index b9404a73b12..4917dad2724 100644 --- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp +++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp @@ -15,43 +15,50 @@ namespace NKikimr { void Handle(TEvBlobStorage::TEvPut::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM01, "TEvPut", (Msg, ev->Get()->ToString())); - Send(ev->Sender, Model->Handle(ev->Get()),0, ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvGet::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM02, "TEvGet", (Msg, ev->Get()->ToString())); - Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvBlock::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM03, "TEvBlock", (Msg, ev->Get()->ToString())); - Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvDiscover::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM04, "TEvDiscover", (Msg, ev->Get()->ToString())); - Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvRange::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM05, "TEvRange", (Msg, ev->Get()->ToString())); - Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvCollectGarbage::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM06, "TEvCollectGarbage", (Msg, ev->Get()->ToString())); - Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvStatus::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM07, "TEvStatus", (Msg, ev->Get()->ToString())); - Send(ev->Sender, new TEvBlobStorage::TEvStatusResult(NKikimrProto::OK, Model->GetStorageStatusFlags()), 0, - ev->Cookie); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), new TEvBlobStorage::TEvStatusResult(NKikimrProto::OK, + Model->GetStorageStatusFlags())), 0, ev->Cookie); } void Handle(TEvBlobStorage::TEvAssimilate::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM09, "TEvAssimilate", (Msg, ev->Get()->ToString())); - Send(ev->Sender, new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::ERROR, "not implemented")); + Send(ev->Sender, CopyExecutionRelay(ev->Get(), new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::ERROR, + "not implemented")), 0, ev->Cookie); + } + + template<typename TOut, typename TIn> + TOut *CopyExecutionRelay(TIn *in, TOut *out) { + out->ExecutionRelay = std::move(in->ExecutionRelay); + return out; } void HandlePoison(TEvents::TEvPoisonPill::TPtr& ev) { diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp index 5e25b1d46b2..80de996e32f 100644 --- a/ydb/core/mind/bscontroller/virtual_group.cpp +++ b/ydb/core/mind/bscontroller/virtual_group.cpp @@ -338,8 +338,10 @@ namespace NKikimr::NBsController { void OnPipeError(TActorId clientId) { if (clientId == HivePipeId) { HivePipeId = {}; + Bootstrap(); } else if (clientId == BlobDepotPipeId) { BlobDepotPipeId = {}; + Bootstrap(); } } @@ -371,13 +373,12 @@ namespace NKikimr::NBsController { } NTabletPipe::CloseAndForgetClient(SelfId(), HivePipeId); + ConfigureBlobDepot(); } void Handle(TEvHive::TEvTabletCreationResult::TPtr ev) { STLOG(PRI_INFO, BS_CONTROLLER, BSCVG05, "received TEvTabletCreationResult", (TabletId, Self->TabletID()), (Msg, ev->Get()->Record)); - - ConfigureBlobDepot(); } void ConfigureBlobDepot() { |