summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-02-02 17:58:14 +0300
committeralexvru <[email protected]>2023-02-02 17:58:14 +0300
commitefcea1e96d8515b4fc6d7f5d67a13024f459bb1e (patch)
tree86ab5a598815790fc2122775bf30ffbbd0aae759
parent18a5238144de7327c5e66ff10063e8f0b2863add (diff)
Fix some BlobDepot issues
-rw-r--r--ydb/core/base/blobstorage.h20
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h27
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp2
-rw-r--r--ydb/core/blob_depot/agent/proxy.cpp44
-rw-r--r--ydb/core/blob_depot/agent/query.cpp18
-rw-r--r--ydb/core/blob_depot/agent/read.cpp131
-rw-r--r--ydb/core/blob_depot/agent/request.cpp25
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp26
-rw-r--r--ydb/core/blob_depot/data.cpp2
-rw-r--r--ydb/core/blob_depot/data.h32
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h46
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_block.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get.cpp4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.h1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp8
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h9
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_request.cpp1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_status.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp25
-rw-r--r--ydb/core/mind/bscontroller/virtual_group.cpp5
32 files changed, 319 insertions, 152 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index bbbe1416098..42e27b391f2 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -875,6 +875,8 @@ struct TEvBlobStorage {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_BLOBSTORAGE),
"expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_BLOBSTORAGE)");
+ struct TExecutionRelay {};
+
struct TEvPutResult;
struct TEvGetResult;
struct TEvBlockResult;
@@ -914,6 +916,7 @@ struct TEvBlobStorage {
mutable NLWTrace::TOrbit Orbit;
ui32 RestartCounter = 0;
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; // (TabletId, Generation) pairs
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvPut(const TLogoBlobID &id, TRcBuf &&buffer, TInstant deadline,
NKikimrBlobStorage::EPutHandleClass handleClass = NKikimrBlobStorage::TabletLog,
@@ -986,6 +989,7 @@ struct TEvBlobStorage {
TString ErrorReason;
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
mutable NLWTrace::TOrbit Orbit;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
@@ -1064,6 +1068,7 @@ struct TEvBlobStorage {
ui32 RestartCounter = 0;
bool PhantomCheck = false;
bool Decommission = false; // is it generated by decommission actor and should be handled by the underlying proxy?
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
struct TTabletData {
TTabletData() = default;
@@ -1211,6 +1216,7 @@ struct TEvBlobStorage {
TString DebugInfo;
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
// to measure blobstorage->client hop
TInstant Sent;
@@ -1272,6 +1278,7 @@ struct TEvBlobStorage {
const ui64 IssuerGuid = RandomNumber<ui64>() | 1;
bool IsMonitored = true;
ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvBlock(ui64 tabletId, ui32 generation, TInstant deadline)
: TabletId(tabletId)
@@ -1312,6 +1319,7 @@ struct TEvBlobStorage {
struct TEvBlockResult : public TEventLocal<TEvBlockResult, EvBlockResult> {
NKikimrProto::EReplyStatus Status;
TString ErrorReason;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvBlockResult(NKikimrProto::EReplyStatus status)
: Status(status)
@@ -1382,6 +1390,7 @@ struct TEvBlobStorage {
const TInstant Deadline;
mutable NLWTrace::TOrbit Orbit;
ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvPatch(ui32 originalGroupId, const TLogoBlobID &originalId, const TLogoBlobID &patchedId,
ui32 maskForCookieBruteForcing, TArrayHolder<TDiff> &&diffs, ui64 diffCount, TInstant deadline)
@@ -1508,6 +1517,7 @@ struct TEvBlobStorage {
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
TString ErrorReason;
mutable NLWTrace::TOrbit Orbit;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvPatchResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, TStorageStatusFlags statusFlags,
ui32 groupId, float approximateFreeSpaceShare)
@@ -1637,6 +1647,7 @@ struct TEvBlobStorage {
const ui32 ForceBlockedGeneration;
const bool FromLeader;
ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvDiscover(ui64 tabletId, ui32 minGeneration, bool readBody, bool discoverBlockedGeneration,
TInstant deadline, ui32 forceBlockedGeneration, bool fromLeader)
@@ -1683,6 +1694,7 @@ struct TEvBlobStorage {
TString Buffer;
ui32 BlockedGeneration;
TString ErrorReason;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvDiscoverResult(NKikimrProto::EReplyStatus status, ui32 minGeneration, ui32 blockedGeneration)
: Status(status)
@@ -1740,6 +1752,7 @@ struct TEvBlobStorage {
ui32 ForceBlockedGeneration;
ui32 RestartCounter = 0;
bool Decommission = false;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvRange(ui64 tabletId, const TLogoBlobID &from, const TLogoBlobID &to, const bool mustRestoreFirst,
TInstant deadline, bool isIndexOnly = false, ui32 forceBlockedGeneration = 0)
@@ -1803,6 +1816,7 @@ struct TEvBlobStorage {
TVector<TResponse> Responses;
const ui32 GroupId;
TString ErrorReason;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvRangeResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &from, const TLogoBlobID &to, ui32 groupId)
: Status(status)
@@ -1863,6 +1877,7 @@ struct TEvBlobStorage {
bool Decommission = false;
ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvCollectGarbage(ui64 tabletId, ui32 recordGeneration, ui32 perGenerationCounter, ui32 channel,
bool collect, ui32 collectGeneration,
@@ -1972,6 +1987,7 @@ struct TEvBlobStorage {
ui32 PerGenerationCounter;
ui32 Channel;
TString ErrorReason;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvCollectGarbageResult(NKikimrProto::EReplyStatus status, ui64 tabletId,
ui32 recordGeneration, ui32 perGenerationCounter, ui32 channel)
@@ -2005,6 +2021,7 @@ struct TEvBlobStorage {
struct TEvStatus : public TEventLocal<TEvStatus, EvStatus> {
const TInstant Deadline;
ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvStatus(TInstant deadline)
: Deadline(deadline)
@@ -2035,6 +2052,7 @@ struct TEvBlobStorage {
TStorageStatusFlags StatusFlags;
float ApproximateFreeSpaceShare = 0.0f; // zero means absence of correct data
TString ErrorReason;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvStatusResult(NKikimrProto::EReplyStatus status, TStorageStatusFlags statusFlags)
: Status(status)
@@ -2064,6 +2082,7 @@ struct TEvBlobStorage {
std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo;
std::optional<TLogoBlobID> SkipBlobsUpTo;
ui32 RestartCounter = 0;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvAssimilate(std::optional<ui64> skipBlocksUpTo, std::optional<std::tuple<ui64, ui8>> skipBarriersUpTo,
std::optional<TLogoBlobID> skipBlobsUpTo)
@@ -2181,6 +2200,7 @@ struct TEvBlobStorage {
std::deque<TBlock> Blocks;
std::deque<TBarrier> Barriers;
std::deque<TBlob> Blobs;
+ std::shared_ptr<TExecutionRelay> ExecutionRelay;
TEvAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason = {})
: Status(status)
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index f9b9bb4ef0f..14b364cbebb 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -109,6 +109,9 @@ namespace NKikimr::NBlobDepot {
protected:
TBlobDepotAgent& Agent;
+ friend class TBlobDepotAgent;
+ std::set<std::weak_ptr<TEvBlobStorage::TExecutionRelay>, std::owner_less<std::weak_ptr<TEvBlobStorage::TExecutionRelay>>> SubrequestRelays;
+
public:
using TResponse = std::variant<
// internal events
@@ -135,7 +138,8 @@ namespace NKikimr::NBlobDepot {
TRequestSender(TBlobDepotAgent& agent);
virtual ~TRequestSender();
void ClearRequestsInFlight();
- void OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response);
+ void OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay);
protected:
virtual void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) = 0;
@@ -150,7 +154,7 @@ namespace NKikimr::NBlobDepot {
, public TRequestSender
{
const ui32 VirtualGroupId;
- const TActorId ProxyId;
+ TActorId ProxyId;
const ui64 AgentInstanceId;
ui64 TabletId = Max<ui64>();
TActorId PipeId;
@@ -230,12 +234,14 @@ namespace NKikimr::NBlobDepot {
}
}
if (!info->GetTotalVDisksNum()) {
+ // proxy finishes serving user requests
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, ProxyId, {}, nullptr, 0));
- return;
+ ProxyId = {};
}
}
-
- TActivationContext::Send(ev->Forward(ProxyId));
+ if (ProxyId) {
+ TActivationContext::Send(ev->Forward(ProxyId));
+ }
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -249,7 +255,8 @@ namespace NKikimr::NBlobDepot {
TRequestsInFlight OtherRequestInFlight;
void RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
- TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet);
+ TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay = nullptr);
template<typename TEvent>
void HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev);
@@ -257,7 +264,8 @@ namespace NKikimr::NBlobDepot {
template<typename TEvent>
void HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev);
- void OnRequestComplete(ui64 id, TRequestSender::TResponse response, TRequestsInFlight& map);
+ void OnRequestComplete(ui64 id, TRequestSender::TResponse response, TRequestsInFlight& map,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay = nullptr);
void DropTabletRequest(ui64 id);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -312,6 +320,7 @@ namespace NKikimr::NBlobDepot {
std::multimap<TMonotonic, TQuery*>::iterator QueryWatchdogMapIter;
NLog::EPriority WatchdogPriority = NLog::PRI_WARN;
bool Destroyed = false;
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
static constexpr TDuration WatchdogDuration = TDuration::Seconds(10);
@@ -365,7 +374,9 @@ namespace NKikimr::NBlobDepot {
TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
: TQuery(agent, std::move(event))
, Request(*Event->Get<TEvent>())
- {}
+ {
+ ExecutionRelay = std::move(Request.ExecutionRelay);
+ }
protected:
TEvent& Request;
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index a678898486d..2682683f4e6 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -134,7 +134,7 @@ namespace NKikimr::NBlobDepot {
while (!TabletRequestInFlight.empty()) {
auto node = TabletRequestInFlight.extract(TabletRequestInFlight.begin());
auto& requestInFlight = node.value();
- requestInFlight.Sender->OnRequestComplete(requestInFlight, TTabletDisconnected{});
+ requestInFlight.Sender->OnRequestComplete(requestInFlight, TTabletDisconnected{}, nullptr);
}
for (auto& [_, kind] : ChannelKinds) {
diff --git a/ydb/core/blob_depot/agent/proxy.cpp b/ydb/core/blob_depot/agent/proxy.cpp
index a38f53e66ee..e472b3d7919 100644
--- a/ydb/core/blob_depot/agent/proxy.cpp
+++ b/ydb/core/blob_depot/agent/proxy.cpp
@@ -4,13 +4,51 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::SendToProxy(ui32 groupId, std::unique_ptr<IEventBase> event, TRequestSender *sender,
TRequestContext::TPtr context) {
+ auto executionRelay = std::make_shared<TEvBlobStorage::TExecutionRelay>();
+
+ switch (event->Type()) {
+ case TEvBlobStorage::EvPut:
+ static_cast<TEvBlobStorage::TEvPut&>(*event).ExecutionRelay = executionRelay;
+ break;
+
+ case TEvBlobStorage::EvGet:
+ static_cast<TEvBlobStorage::TEvGet&>(*event).ExecutionRelay = executionRelay;
+ break;
+ }
+
const ui64 id = NextOtherRequestId++;
- if (groupId == DecommitGroupId) {
+ auto getQueryId = [&] {
+ auto *p = dynamic_cast<TQuery*>(sender);
+ return p ? std::make_optional(p->GetQueryId()) : std::nullopt;
+ };
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA46, "SendToProxy", (AgentId, LogId), (QueryId, getQueryId()),
+ (GroupId, groupId), (DecommitGroupId, DecommitGroupId), (Type, event->Type()), (Cookie, id));
+ if (groupId != DecommitGroupId) {
+ SendToBSProxy(SelfId(), groupId, event.release(), id);
+ } else if (ProxyId) {
Send(ProxyId, event.release(), 0, id);
} else {
- SendToBSProxy(SelfId(), groupId, event.release(), id);
+ std::unique_ptr<IEventBase> response;
+ switch (const ui32 type = event->Type()) {
+ case TEvBlobStorage::EvGet: {
+ response = static_cast<TEvBlobStorage::TEvGet&>(*event).MakeErrorResponse(NKikimrProto::OK,
+ "proxy has vanished", groupId);
+ event.reset(); // drop origin event to prevent extra refcounts on ExecutionRelay
+ auto& r = static_cast<TEvBlobStorage::TEvGetResult&>(*response);
+ for (size_t i = 0; i < r.ResponseSz; ++i) {
+ r.Responses[i].Status = NKikimrProto::NODATA;
+ }
+ r.ExecutionRelay = executionRelay;
+ break;
+ }
+
+ default:
+ Y_FAIL("unexpected request type for decommission proxy Type# 0x%08" PRIx32, type);
+ }
+ Send(SelfId(), response.release(), 0, id);
}
- RegisterRequest(id, sender, std::move(context), {}, false);
+
+ RegisterRequest(id, sender, std::move(context), {}, false, std::move(executionRelay));
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index 495d9f09f45..947ea8b027c 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -159,6 +159,10 @@ namespace NKikimr::NBlobDepot {
auto nh = Agent.QueryWatchdogMap.extract(QueryWatchdogMapIter);
nh.key() = now + WatchdogDuration;
QueryWatchdogMapIter = Agent.QueryWatchdogMap.insert(std::move(nh));
+ for (const auto& cookie : SubrequestRelays) {
+ Y_VERIFY_S(!cookie.expired(), "AgentId# " << Agent.LogId << " QueryId# " << GetQueryId()
+ << " subrequest got stuck");
+ }
}
void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) {
@@ -171,7 +175,9 @@ namespace NKikimr::NBlobDepot {
#define XX(TYPE) \
case TEvBlobStorage::TYPE: \
response = Event->Get<TEvBlobStorage::T##TYPE>()->MakeErrorResponse(status, errorReason, Agent.VirtualGroupId); \
- break;
+ static_cast<TEvBlobStorage::T##TYPE##Result&>(*response).ExecutionRelay = std::move(ExecutionRelay); \
+ break; \
+ //
ENUMERATE_INCOMING_EVENTS(XX)
#undef XX
@@ -185,6 +191,16 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (AgentId, Agent.LogId),
(QueryId, GetQueryId()), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime));
+ switch (response->Type()) {
+#define XX(TYPE) \
+ case TEvBlobStorage::TYPE##Result: \
+ static_cast<TEvBlobStorage::T##TYPE##Result&>(*response).ExecutionRelay = std::move(ExecutionRelay); \
+ break; \
+ //
+
+ ENUMERATE_INCOMING_EVENTS(XX)
+#undef XX
+ }
Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie);
OnDestroy(true);
DoDestroy();
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index d367c21b56e..1f9816379e9 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -2,11 +2,15 @@
namespace NKikimr::NBlobDepot {
- struct TBlobDepotAgent::TQuery::TReadContext : TRequestContext {
+ struct TBlobDepotAgent::TQuery::TReadContext
+ : TRequestContext
+ , std::enable_shared_from_this<TReadContext>
+ {
TReadArg ReadArg;
const ui64 Size;
TString Buffer;
bool Terminated = false;
+ bool StopProcessingParts = false;
ui32 NumPartsPending = 0;
TLogoBlobID BlobWithoutData;
@@ -15,17 +19,27 @@ namespace NKikimr::NBlobDepot {
, Size(size)
{}
- void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) {
- query->OnRead(ReadArg.Tag, status, errorReason);
- Terminated = true;
- }
-
void Abort() {
Terminated = true;
}
void EndWithSuccess(TQuery *query) {
+ Y_VERIFY(!Terminated);
query->OnRead(ReadArg.Tag, NKikimrProto::OK, std::move(Buffer));
+ Abort();
+ }
+
+ void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) {
+ Y_VERIFY(!Terminated);
+ Y_VERIFY(status != NKikimrProto::NODATA && status != NKikimrProto::OK);
+ query->OnRead(ReadArg.Tag, status, errorReason);
+ Abort();
+ }
+
+ void EndWithNoData(TQuery *query) {
+ Y_VERIFY(!Terminated);
+ query->OnRead(ReadArg.Tag, NKikimrProto::NODATA, {});
+ Abort();
}
ui64 GetTag() const {
@@ -44,7 +58,8 @@ namespace NKikimr::NBlobDepot {
bool TBlobDepotAgent::TQuery::IssueRead(TReadArg&& arg, TString& error) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value));
+ (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
+ (Value, arg.Value));
ui64 outputOffset = 0;
@@ -69,7 +84,8 @@ namespace NKikimr::NBlobDepot {
if (end <= begin || blobId.BlobSize() < end) {
error = "incorrect SubrangeBegin/SubrangeEnd pair";
STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value));
+ (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
+ (Value, arg.Value));
return false;
}
@@ -101,7 +117,8 @@ namespace NKikimr::NBlobDepot {
if (size) {
error = "incorrect offset/size provided";
STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value));
+ (ReadId, arg.Tag), (Key, Agent.PrettyKey(arg.Key)), (Offset, arg.Offset), (Size, arg.Size),
+ (Value, arg.Value));
return false;
}
@@ -128,14 +145,10 @@ namespace NKikimr::NBlobDepot {
partContext->Offsets.push_back(outputOffset);
}
- // when the TEvGet query is sent to the underlying proxy, MustRestoreFirst must be cleared, or else it may
- // lead to ERROR due to impossibility of writes; all MustRestoreFirst should be handled by the TEvResolve
- // query
- auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), context->ReadArg.GetHandleClass,
- context->ReadArg.MustRestoreFirst && groupId != Agent.DecommitGroupId);
+ auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), context->ReadArg.GetHandleClass);
event->ReaderTabletData = context->ReadArg.ReaderTabletData;
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA39, "issuing TEvGet", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, context->GetTag()), (GroupId, groupId), (Msg, *event));
+ (ReadId, context->GetTag()), (Key, Agent.PrettyKey(context->ReadArg.Key)), (GroupId, groupId), (Msg, *event));
Agent.SendToProxy(groupId, std::move(event), this, std::move(partContext));
++context->NumPartsPending;
}
@@ -148,57 +161,60 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::TQuery::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) {
auto& partContext = context->Obtain<TReadContext::TPartContext>();
auto& readContext = *partContext.Read;
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA41, "HandleGetResult", (AgentId, Agent.LogId), (ReadId, readContext.GetTag()),
- (Msg, msg));
- if (readContext.Terminated) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA41, "HandleGetResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), (Msg, msg),
+ (Terminated, readContext.Terminated));
+ if (readContext.Terminated || readContext.StopProcessingParts) {
return; // just ignore this read
}
- if (msg.Status != NKikimrProto::OK) {
- readContext.EndWithError(this, msg.Status, std::move(msg.ErrorReason));
- } else {
- Y_VERIFY(msg.ResponseSz == partContext.Offsets.size());
-
- for (ui32 i = 0; i < msg.ResponseSz; ++i) {
- auto& blob = msg.Responses[i];
- if (blob.Status == NKikimrProto::NODATA) {
- NKikimrBlobDepot::TEvResolve resolve;
- auto *item = resolve.AddItems();
- item->SetExactKey(readContext.ReadArg.Key);
- Agent.Issue(std::move(resolve), this, partContext.Read);
- readContext.Abort();
- readContext.BlobWithoutData = blob.Id;
- return;
- } else if (blob.Status != NKikimrProto::OK) {
- return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id);
- }
+ Y_VERIFY(msg.ResponseSz == partContext.Offsets.size());
+
+ for (ui32 i = 0; i < msg.ResponseSz; ++i) {
+ auto& blob = msg.Responses[i];
+ if (blob.Status == NKikimrProto::NODATA) {
+ NKikimrBlobDepot::TEvResolve resolve;
+ auto *item = resolve.AddItems();
+ item->SetExactKey(readContext.ReadArg.Key);
+ item->SetMustRestoreFirst(readContext.ReadArg.MustRestoreFirst);
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAxx, "issuing extra resolve", (Agent, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), (Msg, resolve));
+ Agent.Issue(std::move(resolve), this, readContext.shared_from_this());
+ readContext.StopProcessingParts = true;
+ readContext.BlobWithoutData = blob.Id;
+ return;
+ } else if (blob.Status != NKikimrProto::OK) {
+ return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id);
+ }
- auto& buffer = readContext.Buffer;
- const ui64 offset = partContext.Offsets[i];
+ auto& buffer = readContext.Buffer;
+ const ui64 offset = partContext.Offsets[i];
- Y_VERIFY(offset < readContext.Size && blob.Buffer.size() <= readContext.Size - offset);
+ Y_VERIFY(offset < readContext.Size && blob.Buffer.size() <= readContext.Size - offset);
- if (!buffer && !offset) {
- buffer = std::move(blob.Buffer);
- buffer.resize(readContext.Size);
- } else {
- if (!buffer) {
- buffer = TString::Uninitialized(readContext.Size);
- }
- memcpy(buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size());
+ if (!buffer && !offset) {
+ buffer = std::move(blob.Buffer);
+ buffer.resize(readContext.Size);
+ } else {
+ if (!buffer) {
+ buffer = TString::Uninitialized(readContext.Size);
}
+ memcpy(buffer.Detach() + offset, blob.Buffer.data(), blob.Buffer.size());
}
+ }
- if (!--readContext.NumPartsPending) {
- readContext.EndWithSuccess(this);
- }
+ if (!--readContext.NumPartsPending) {
+ readContext.EndWithSuccess(this);
}
}
void TBlobDepotAgent::TQuery::HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg) {
auto& readContext = context->Obtain<TReadContext>();
+ if (readContext.Terminated) {
+ return;
+ }
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA42, "HandleResolveResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
- (ReadId, readContext.GetTag()), (Msg, msg.Record));
+ (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)), (Msg, msg.Record));
if (msg.Record.GetStatus() != NKikimrProto::OK) {
readContext.EndWithError(this, msg.Record.GetStatus(), msg.Record.GetErrorReason());
} else if (msg.Record.ResolvedKeysSize() == 1) {
@@ -210,18 +226,19 @@ namespace NKikimr::NBlobDepot {
readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to restart read Error# " << error);
}
} else if (!item.GetReliablyWritten()) { // this was unassimilated value and we got NODATA for it
- readContext.EndWithError(this, NKikimrProto::NODATA, {});
+ readContext.EndWithNoData(this);
} else {
- Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " QueryId# " << GetQueryId()
+ STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to read blob: data seems to be lost", (AgentId, Agent.LogId),
+ (QueryId, GetQueryId()), (ReadId, readContext.GetTag()), (Key, Agent.PrettyKey(readContext.ReadArg.Key)),
+ (BlobId, readContext.BlobWithoutData));
+ Y_VERIFY_DEBUG_S(false, "data seems to be lost AgentId# " << Agent.LogId << " QueryId# " << GetQueryId()
<< " ReadId# " << readContext.GetTag() << " BlobId# " << readContext.BlobWithoutData);
- STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to read blob -- data is lost", (AgentId, Agent.LogId),
- (QueryId, GetQueryId()), (ReadId, readContext.GetTag()), (BlobId, readContext.BlobWithoutData));
readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to read BlobId# "
- << readContext.BlobWithoutData << ": data is lost");
+ << readContext.BlobWithoutData << ": data seems to be lost");
}
} else {
Y_VERIFY(!msg.Record.ResolvedKeysSize());
- readContext.EndWithError(this, NKikimrProto::NODATA, {});
+ readContext.EndWithNoData(this);
}
}
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
index dc830f5cecd..621d5be545d 100644
--- a/ydb/core/blob_depot/agent/request.cpp
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -36,7 +36,12 @@ namespace NKikimr::NBlobDepot {
});
}
- void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response) {
+ void TRequestSender::OnRequestComplete(TRequestInFlight& requestInFlight, TResponse response,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) {
+ if (executionRelay) {
+ const size_t num = SubrequestRelays.erase(executionRelay);
+ Y_VERIFY(num);
+ }
requestInFlight.Unlink();
ProcessResponse(requestInFlight.Id, std::move(requestInFlight.Context), std::move(response));
}
@@ -63,11 +68,15 @@ namespace NKikimr::NBlobDepot {
// TBlobDepotAgent machinery
void TBlobDepotAgent::RegisterRequest(ui64 id, TRequestSender *sender, TRequestContext::TPtr context,
- TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet) {
+ TRequestInFlight::TCancelCallback cancelCallback, bool toBlobDepotTablet,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) {
TRequestsInFlight& map = toBlobDepotTablet ? TabletRequestInFlight : OtherRequestInFlight;
const bool inserted = map.emplace(id, sender, std::move(context), std::move(cancelCallback),
toBlobDepotTablet).second;
Y_VERIFY(inserted);
+ if (executionRelay) {
+ sender->SubrequestRelays.emplace(executionRelay);
+ }
}
template<typename TEvent>
@@ -91,18 +100,20 @@ namespace NKikimr::NBlobDepot {
template<typename TEvent>
void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId),
- (Id, ev->Cookie), (Type, TypeName<TEvent>()));
- OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight);
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA17, "HandleOtherResponse", (AgentId, LogId), (Id, ev->Cookie),
+ (Type, TypeName<TEvent>()), (Msg, *ev->Get()));
+ Y_VERIFY(ev->Get()->ExecutionRelay);
+ OnRequestComplete(ev->Cookie, ev->Get(), OtherRequestInFlight, std::move(ev->Get()->ExecutionRelay));
}
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvGetResult::TPtr ev);
template void TBlobDepotAgent::HandleOtherResponse(TEvBlobStorage::TEvPutResult::TPtr ev);
- void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map) {
+ void TBlobDepotAgent::OnRequestComplete(ui64 id, TResponse response, TRequestsInFlight& map,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) {
if (auto node = map.extract(id)) {
auto& requestInFlight = node.value();
- requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response));
+ requestInFlight.Sender->OnRequestComplete(requestInFlight, std::move(response), std::move(executionRelay));
}
}
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index 4e97c34738b..697dfb2d495 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -103,7 +103,7 @@ namespace NKikimr::NBlobDepot {
}
void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString buffer) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (AgentId, Agent.LogId),
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA35, "OnRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
(Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0),
(ErrorReason, status != NKikimrProto::OK ? buffer : ""));
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 70e50a56f17..d9a2b049f6b 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -11,7 +11,6 @@ namespace NKikimr::NBlobDepot {
std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response;
ui32 ReadsInFlight = 0;
- ui32 ResolvesInFlight = 0;
std::map<TLogoBlobID, TString> FoundBlobs;
std::vector<TRead> Reads;
bool Reverse = false;
@@ -27,10 +26,7 @@ namespace NKikimr::NBlobDepot {
Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, Request.From, Request.To,
Agent.VirtualGroupId);
- IssueResolve();
- }
-
- void IssueResolve() {
+ // issue resolve query
TString from = Request.From.AsBinaryString();
TString to = Request.To.AsBinaryString();
Reverse = Request.To < Request.From;
@@ -50,18 +46,6 @@ namespace NKikimr::NBlobDepot {
item->SetMustRestoreFirst(Request.MustRestoreFirst);
Agent.Issue(std::move(resolve), this, nullptr);
- ++ResolvesInFlight;
- }
-
- void IssueResolve(ui64 tag) {
- NKikimrBlobDepot::TEvResolve resolve;
- auto *item = resolve.AddItems();
- item->SetExactKey(Reads[tag].Id.AsBinaryString());
- item->SetMustRestoreFirst(Request.MustRestoreFirst);
- item->SetCookie(tag);
-
- Agent.Issue(std::move(resolve), this, nullptr);
- ++ResolvesInFlight;
}
void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override {
@@ -81,7 +65,8 @@ namespace NKikimr::NBlobDepot {
}
void HandleResolveResult(ui64 id, TRequestContext::TPtr context, NKikimrBlobDepot::TEvResolveResult& msg) {
- --ResolvesInFlight;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA47, "HandleResolveResult", (AgentId, Agent.LogId),
+ (QueryId, GetQueryId()), (Msg, msg));
if (msg.GetStatus() != NKikimrProto::OK && msg.GetStatus() != NKikimrProto::OVERRUN) {
return EndWithError(msg.GetStatus(), msg.GetErrorReason());
@@ -139,7 +124,8 @@ namespace NKikimr::NBlobDepot {
case NKikimrProto::OK: {
Y_VERIFY(dataOrErrorReason.size() == read.Id.BlobSize());
const bool inserted = FoundBlobs.try_emplace(read.Id, std::move(dataOrErrorReason)).second;
- Y_VERIFY(inserted);
+ Y_VERIFY_S(inserted, "AgentId# " << Agent.LogId << " QueryId# " << GetQueryId()
+ << " duplicate BlobId# " << read.Id << " received");
break;
}
@@ -157,7 +143,7 @@ namespace NKikimr::NBlobDepot {
}
void CheckAndFinish() {
- if (!ReadsInFlight && !ResolvesInFlight && !Finished) {
+ if (!ReadsInFlight && !Finished) {
for (auto& [id, buffer] : FoundBlobs) {
Y_VERIFY_S(buffer.size() == Request.IsIndexOnly ? 0 : id.BlobSize(), "Id# " << id << " Buffer.size# " << buffer.size());
Response->Responses.emplace_back(id, std::move(buffer));
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 0ce533696a6..5251191bd47 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -454,7 +454,7 @@ namespace NKikimr::NBlobDepot {
};
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT13, "Trim", (Id, Self->GetLogId()), (AgentId, agent.Connection->NodeId),
- (Id, ev->Cookie), (Channel, channelIndex), (InvalidatedStep, invalidatedStep),
+ (Id, ev->Cookie), (Channel, int(channelIndex)), (InvalidatedStep, invalidatedStep),
(GivenIdRanges, channel.GivenIdRanges),
(Agent.GivenIdRanges, agent.GivenIdRanges[channelIndex]),
(WritesInFlight, makeWritesInFlight()));
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 7231cc8226c..463b6979c7c 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -379,6 +379,9 @@ namespace NKikimr::NBlobDepot {
ui64 MaxKeys = 0;
ui32 PrechargeRows = 0;
ui64 PrechargeBytes = 0;
+#ifndef NDEBUG
+ std::set<TKey> KeysInRange = {}; // runtime state
+#endif
};
private:
@@ -490,9 +493,8 @@ namespace NKikimr::NBlobDepot {
}
while (rowset.IsValid()) {
TKey key = TKey::FromBinaryKey(rowset.GetKey(), Data->Self->Config);
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "ScanRange.Load", (Id, Data->Self->GetLogId()), (Left, left),
+ STLOG(PRI_TRACE, BLOB_DEPOT, BDT46, "ScanRange.Load", (Id, Data->Self->GetLogId()), (Left, left),
(Right, right), (Key, key));
- LastProcessedKey.emplace(key);
if (left < key && key < right) {
TValue* const value = Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(),
rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>());
@@ -504,18 +506,16 @@ namespace NKikimr::NBlobDepot {
Processing = Range.Flags & EScanFlags::INCLUDE_END ? key <= Range.End : key < Range.End;
}
}
- Processing = Processing && callback(std::move(key), *value);
+ Processing = Processing && callback(key, *value);
*Progress = true;
} else {
Y_VERIFY_DEBUG(key == left || key == right);
}
+ LastProcessedKey.emplace(std::move(key));
if (!rowset.Next()) {
return false; // we break iteration anyway, because we can't read more data
}
}
- if (!LastProcessedKey || (Range.Flags & EScanFlags::REVERSE ? left < *LastProcessedKey : *LastProcessedKey < right)) {
- LastProcessedKey.emplace(Range.Flags & EScanFlags::REVERSE ? left : right);
- }
return Processing;
};
};
@@ -526,7 +526,7 @@ namespace NKikimr::NBlobDepot {
template<typename TCallback>
bool ScanRange(TScanRange& range, NTabletFlatExecutor::TTransactionContext *txc, bool *progress, TCallback&& callback) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "ScanRange", (Id, Self->GetLogId()), (Begin, range.Begin), (End, range.End),
+ STLOG(PRI_TRACE, BLOB_DEPOT, BDT76, "ScanRange", (Id, Self->GetLogId()), (Begin, range.Begin), (End, range.End),
(Flags, range.Flags), (MaxKeys, range.MaxKeys));
const bool reverse = range.Flags & EScanFlags::REVERSE;
@@ -534,11 +534,13 @@ namespace NKikimr::NBlobDepot {
bool res = true;
- auto issue = [&](TKey&& key, const TValue& value) {
+ auto issue = [&](const TKey& key, const TValue& value) {
Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_BEGIN ? range.Begin <= key : range.Begin < key);
Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_END ? key <= range.End : key < range.End);
-
- if (!callback(key, value) || (range.MaxKeys && !--range.MaxKeys)) {
+#ifndef NDEBUG
+ Y_VERIFY(range.KeysInRange.insert(key).second); // ensure that the generated key is unique
+#endif
+ if (!callback(key, value) || !--range.MaxKeys) {
return false; // scan aborted by user or finished scanning the required range
} else {
// remove already scanned items from the range query
@@ -549,7 +551,7 @@ namespace NKikimr::NBlobDepot {
const auto& from = reverse ? TKey::Min() : range.Begin;
const auto& to = reverse ? range.End : TKey::Max();
LoadedKeys.EnumInRange(from, to, reverse, [&](const TKey& left, const TKey& right, bool isRangeLoaded) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "ScanRange.Step", (Id, Self->GetLogId()), (Left, left), (Right, right),
+ STLOG(PRI_TRACE, BLOB_DEPOT, BDT83, "ScanRange.Step", (Id, Self->GetLogId()), (Left, left), (Right, right),
(IsRangeLoaded, isRangeLoaded), (From, from), (To, to));
if (!isRangeLoaded) {
// we have to load range (left, right), not including both ends
@@ -565,7 +567,7 @@ namespace NKikimr::NBlobDepot {
break;
} else if (range.Flags & EScanFlags::INCLUDE_BEGIN ? key < range.Begin : key <= range.Begin) {
return false; // just left the left side of the range
- } else if ((key != range.End || range.Flags & EScanFlags::INCLUDE_END) && !issue(TKey(key), value)) {
+ } else if ((key != range.End || range.Flags & EScanFlags::INCLUDE_END) && !issue(key, value)) {
return false; // enough keys processed
}
}
@@ -576,11 +578,15 @@ namespace NKikimr::NBlobDepot {
const auto& [key, value] = *it;
if (range.Flags & EScanFlags::INCLUDE_END ? range.End < key : range.End <= key) {
return false; // just left the right side of the range
- } else if ((key != range.Begin || range.Flags & EScanFlags::INCLUDE_BEGIN) && !issue(TKey(key), value)) {
+ } else if ((key != range.Begin || range.Flags & EScanFlags::INCLUDE_BEGIN) && !issue(key, value)) {
return false; // enough keys processed
}
}
}
+ if (!loader.LastProcessedKey || (reverse & EScanFlags::REVERSE ? left < *loader.LastProcessedKey :
+ *loader.LastProcessedKey < right)) {
+ loader.LastProcessedKey.emplace(reverse ? left : right);
+ }
return true;
});
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index ab8820c93b2..c40dc750612 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -131,6 +131,33 @@ NActors::NLog::EPriority PriorityForStatusOutbound(NKikimrProto::EReplyStatus st
NActors::NLog::EPriority PriorityForStatusResult(NKikimrProto::EReplyStatus status);
NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus status);
+inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) {
+ switch (const ui32 type = ev.Type()) {
+#define XX(T) \
+ case TEvBlobStorage::Ev##T: \
+ static_cast<TEvBlobStorage::TEv##T&>(ev).ExecutionRelay = std::move(executionRelay); \
+ break; \
+ case TEvBlobStorage::Ev##T##Result: \
+ static_cast<TEvBlobStorage::TEv##T##Result&>(ev).ExecutionRelay = std::move(executionRelay); \
+ break; \
+ //
+
+ XX(Put)
+ XX(Get)
+ XX(Block)
+ XX(Discover)
+ XX(Range)
+ XX(CollectGarbage)
+ XX(Status)
+ XX(Patch)
+ XX(Assimilate)
+#undef XX
+
+ default:
+ Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
+ }
+}
+
template<typename TDerived>
class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor<TDerived>> {
public:
@@ -141,7 +168,8 @@ public:
TBlobStorageGroupRequestActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TIntrusivePtr<TGroupQueues> groupQueues,
TIntrusivePtr<TBlobStorageGroupProxyMon> mon, const TActorId& source, ui64 cookie, NWilson::TTraceId traceId,
NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled, TMaybe<TGroupStat::EKind> latencyQueueKind,
- TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name)
+ TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay)
: TActor<TBlobStorageGroupRequestActor<TDerived>>(&TThis::InitialStateFunc, TDerived::ActorActivityType())
, Info(std::move(info))
, GroupQueues(std::move(groupQueues))
@@ -155,6 +183,7 @@ public:
, LatencyQueueKind(latencyQueueKind)
, RequestStartTime(now)
, RacingDomains(&Info->GetTopology())
+ , ExecutionRelay(std::move(executionRelay))
{
TDerived::ActiveCounter(Mon)->Inc();
Span
@@ -272,6 +301,9 @@ public:
// make NodeWarden restart the query just after proxy reconfiguration
Y_VERIFY_DEBUG(RestartCounter < 100);
auto q = self.RestartQuery(RestartCounter + 1);
+ if (q->Type() != TEvBlobStorage::EvBunchOfEvents) {
+ SetExecutionRelay(*q, std::exchange(ExecutionRelay, {}));
+ }
++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)];
const TActorId& proxyId = MakeBlobStorageProxyID(Info->GroupID);
TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span.GetTraceId()));
@@ -414,6 +446,9 @@ public:
}
void PassAway() override {
+ // ensure we didn't keep execution relay on occasion
+ Y_VERIFY_DEBUG_S(!ExecutionRelay, LogCtx.RequestPrefix << " actor died without properly sending response");
+
// ensure that we are dying for the first time
Y_VERIFY(!std::exchange(Dead, true));
TDerived::ActiveCounter(Mon)->Dec();
@@ -452,6 +487,13 @@ public:
#undef XX
}
+ if (ExecutionRelay) {
+ SetExecutionRelay(*ev, std::exchange(ExecutionRelay, {}));
+ ExecutionRelayUsed = true;
+ } else {
+ Y_VERIFY(!ExecutionRelayUsed);
+ }
+
// ensure that we are dying for the first time
Y_VERIFY(!Dead);
if (RequestHandleClass && PoolCounters) {
@@ -543,6 +585,8 @@ private:
std::deque<std::unique_ptr<IEventHandle>> PostponedQ;
TBlobStorageGroupInfo::TGroupFailDomains RacingDomains; // a set of domains we've received RACE from
TActorId ProxyActorId;
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
+ bool ExecutionRelayUsed = false;
};
void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
index 15138025e56..3e76e63db80 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
@@ -268,7 +268,7 @@ public:
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_ASSIMILATE, false, {}, now, storagePoolCounters, ev->RestartCounter,
- "DSProxy.Assimilate")
+ "DSProxy.Assimilate", std::move(ev->ExecutionRelay))
, SkipBlocksUpTo(ev->SkipBlocksUpTo)
, SkipBarriersUpTo(ev->SkipBarriersUpTo)
, SkipBlobsUpTo(ev->SkipBlobsUpTo)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
index f381d505a7d..117b9f65886 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
@@ -139,7 +139,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter,
- "DSProxy.Block")
+ "DSProxy.Block", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, Generation(ev->Generation)
, Deadline(ev->Deadline)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
index d5875e70955..de0d07e30c5 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
@@ -111,7 +111,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
auto result = std::make_unique<TEvBlobStorage::TEvCollectGarbageResult>(status, TabletId, RecordGeneration,
PerGenerationCounter, Channel);
result->ErrorReason = ErrorReason;
- A_LOG_LOG_S(true, PriorityForStatusOutbound(status), "DSPC02", "Result# " << result->Print(false));
+ A_LOG_LOG_S(true, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "DSPC02", "Result# " << result->Print(false));
SendResponseAndDie(std::move(result));
}
@@ -147,7 +147,7 @@ public:
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter,
- "DSProxy.CollectGarbage")
+ "DSProxy.CollectGarbage", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, RecordGeneration(ev->RecordGeneration)
, PerGenerationCounter(ev->PerGenerationCounter)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
index b9201f16922..ca296d76ad3 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
@@ -886,7 +886,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter,
- "DSProxy.Discover")
+ "DSProxy.Discover", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, ReadBody(ev->ReadBody)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
index 9a5c04380ef..31ceba6c386 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
@@ -464,7 +464,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.Discover(mirror-3-dc)")
+ ev->RestartCounter, "DSProxy.Discover(mirror-3-dc)", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
index 2b350fe8a6c..4e7ff9c2ce7 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
@@ -36,7 +36,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.Discover(mirror-3of4)")
+ ev->RestartCounter, "DSProxy.Discover(mirror-3of4)", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
index 0b89004dd24..bd5ed008eab 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
@@ -465,6 +465,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvGet, requestSize, duration.SecondsFloat() * 1000.0, tabletId,
evResult->GroupId, channel, NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()),
success);
+ A_LOG_LOG_S(true, success ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPG68", "Result# " << evResult->Print(false));
return SendResponseAndDie(std::unique_ptr<TEvBlobStorage::TEvGetResult>(evResult.Release()));
}
@@ -493,7 +494,8 @@ public:
TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo,
- latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get")
+ latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get",
+ std::move(ev->ExecutionRelay))
, GetImpl(info, state, ev, std::move(nodeLayout), LogCtx.RequestPrefix)
, Orbit(std::move(ev->Orbit))
, Deadline(ev->Deadline)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
index 5a80f045d55..81198774537 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
@@ -124,14 +124,15 @@ public:
TString DumpQuery() const {
TStringStream str;
- str << "{";
+ str << '{';
+ str << "MustRestoreFirst# " << MustRestoreFirst;
for (ui32 i = 0; i < QuerySize; ++i) {
- str << (i ? " " : "")
+ str << ' '
<< Queries[i].Id
- << "@" << Queries[i].Shift
- << ":" << Queries[i].Size;
+ << '@' << Queries[i].Shift
+ << ':' << Queries[i].Size;
}
- str << "}";
+ str << '}';
return str.Str();
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
index 47d60411f30..8e46ae6c369 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
@@ -285,6 +285,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
? NKikimrProto::ERROR
: NKikimrProto::NO_GROUP;
auto response = ev->Get()->MakeErrorResponse(status, ErrorDescription, GroupId);
+ SetExecutionRelay(*response, std::move(ev->Get()->ExecutionRelay));
NActors::NLog::EPriority priority = CheckPriorityForErrorState();
LOG_LOG_S(*TlsActivationContext, priority, NKikimrServices::BS_PROXY, ExtraLogInfo << "Group# " << GroupId
<< " HandleError ev# " << ev->Get()->Print(false)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
index 82499e11089..c15adde2177 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
@@ -35,7 +35,7 @@ class TBlobStorageGroupIndexRestoreGetRequest
THashMap<TLogoBlobID, std::pair<bool, bool>> KeepFlags;
void ReplyAndDie(NKikimrProto::EReplyStatus status) {
- A_LOG_DEBUG_S("DSPI14", "ReplyAndDie"
+ A_LOG_INFO_S("DSPI14", "ReplyAndDie"
<< " Reply with status# " << NKikimrProto::EReplyStatus_Name(status)
<< " PendingResult# " << (PendingResult ? PendingResult->ToString().data() : "nullptr"));
if (status != NKikimrProto::OK) {
@@ -270,7 +270,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_INDEXRESTOREGET, false, latencyQueueKind, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.IndexRestoreGet")
+ ev->RestartCounter, "DSProxy.IndexRestoreGet", std::move(ev->ExecutionRelay))
, QuerySize(ev->QuerySize)
, Queries(ev->Queries.Release())
, Deadline(ev->Deadline)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
index 28d623e7d77..e16fb06f16f 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
@@ -97,7 +97,7 @@ public:
NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_MULTICOLLECT, false, {}, now, storagePoolCounters, 0,
- "DSProxy.MultiCollect")
+ "DSProxy.MultiCollect", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, RecordGeneration(ev->RecordGeneration)
, PerGenerationCounter(ev->PerGenerationCounter)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
index e5d734c9d78..eba8d52a259 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
@@ -97,7 +97,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_MULTIGET, false, latencyQueueKind, now, storagePoolCounters, 0,
- "DSProxy.MultiGet")
+ "DSProxy.MultiGet", std::move(ev->ExecutionRelay))
, QuerySize(ev->QuerySize)
, Queries(ev->Queries.Release())
, Deadline(ev->Deadline)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
index b067f4a5f65..8950f18d7d3 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
@@ -108,7 +108,7 @@ public:
bool useVPatch = false)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_PATCH, false, {}, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.Patch")
+ ev->RestartCounter, "DSProxy.Patch", std::move(ev->ExecutionRelay))
, OriginalGroupId(ev->OriginalGroupId)
, OriginalId(ev->OriginalId)
, PatchedId(ev->PatchedId)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index 5c682482e68..a9ce350f309 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -329,7 +329,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> &putResult, ui64 blobIdx) {
NKikimrProto::EReplyStatus status = putResult->Status;
- A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_NOTICE, "BPP21",
+ A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21",
"SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent
<< " PutImpl.Blobs.size# " << PutImpl.Blobs.size()
<< " Last# " << (ResponsesSent + 1 == PutImpl.Blobs.size() ? "true" : "false"));
@@ -353,6 +353,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
LWTRACK(DSProxyPutReply, PutImpl.Blobs[blobIdx].Orbit);
putResult->Orbit = std::move(PutImpl.Blobs[blobIdx].Orbit);
putResult->WrittenBeyondBarrier = PutImpl.WrittenBeyondBarrier[blobIdx];
+ putResult->ExecutionRelay = std::move(PutImpl.Blobs[blobIdx].ExecutionRelay);
if (!IsManyPuts) {
SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr);
} else {
@@ -402,6 +403,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
item.Span.GetTraceId()
));
put->RestartCounter = counter;
+ put->ExecutionRelay = std::move(item.ExecutionRelay);
}
return ev;
}
@@ -429,7 +431,7 @@ public:
bool enableRequestMod3x3ForMinLatecy)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.Put")
+ ev->RestartCounter, "DSProxy.Put", nullptr)
, PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy, source, cookie, Span.GetTraceId())
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, Deadline(ev->Deadline)
@@ -472,7 +474,7 @@ public:
bool enableRequestMod3x3ForMinLatecy)
: TBlobStorageGroupRequestActor(info, state, mon, TActorId(), 0, NWilson::TTraceId(),
NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters,
- MaxRestartCounter(events), "DSProxy.Put")
+ MaxRestartCounter(events), "DSProxy.Put", nullptr)
, PutImpl(info, state, events, mon, handleClass, tactic, enableRequestMod3x3ForMinLatecy)
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, IsManyPuts(true)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index 2695ea1ea83..52d22ffec5e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -55,9 +55,11 @@ private:
bool Replied = false;
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
NWilson::TSpan Span;
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
TBlobInfo(TLogoBlobID id, TRope&& buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId,
- NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single)
+ NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single,
+ std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay)
: BlobId(id)
, Buffer(std::move(buffer))
, BufferSize(Buffer.size())
@@ -66,6 +68,7 @@ private:
, Orbit(std::move(orbit))
, ExtraBlockChecks(std::move(extraBlockChecks))
, Span(single ? NWilson::TSpan() : NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Put.Blob"))
+ , ExecutionRelay(std::move(executionRelay))
{}
void Output(IOutputStream& s) const {
@@ -118,7 +121,7 @@ public:
, Tactic(ev->Tactic)
{
Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit),
- std::move(ev->ExtraBlockChecks), true);
+ std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay));
auto& blob = Blobs.back();
LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
@@ -147,7 +150,7 @@ public:
Y_VERIFY(msg.HandleClass == putHandleClass);
Y_VERIFY(msg.Tactic == tactic);
Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId),
- std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false);
+ std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay));
Deadline = Max(Deadline, msg.Deadline);
auto& blob = Blobs.back();
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index fbcf7312169..d1783af11f8 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -299,7 +299,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
if (To < From) {
std::reverse(result->Responses.begin(), result->Responses.end());
}
- A_LOG_LOG_S(true, PriorityForStatusOutbound(status), "DSR05", "Result# " << result->Print(false));
+ A_LOG_LOG_S(true, NLog::PRI_INFO, "DSR05", "Result# " << result->Print(false));
SendReply(result);
}
@@ -307,7 +307,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
std::unique_ptr<TEvBlobStorage::TEvRangeResult> result(new TEvBlobStorage::TEvRangeResult(
status, From, To, Info->GroupID));
result->ErrorReason = ErrorReason;
- A_LOG_LOG_S(true, PriorityForStatusOutbound(status), "DSR06", "Result# " << result->Print(false));
+ A_LOG_LOG_S(true, NLog::PRI_NOTICE, "DSR06", "Result# " << result->Print(false));
SendReply(result);
}
@@ -342,7 +342,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_RANGE, false, {}, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.Range")
+ ev->RestartCounter, "DSProxy.Range", std::move(ev->ExecutionRelay))
, TabletId(ev->TabletId)
, From(ev->From)
, To(ev->To)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
index 87b4d72b5df..6a622b7b7a2 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
@@ -91,6 +91,7 @@ namespace NKikimr {
std::unique_ptr<TEvBlobStorage::TEvPutResult> result(
new TEvBlobStorage::TEvPutResult(NKikimrProto::ERROR, ev->Get()->Id, 0, GroupId, 0.f));
result->ErrorReason = str.Str();
+ result->ExecutionRelay = std::move(ev->Get()->ExecutionRelay);
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::BS_PROXY,
"HandleNormal ev# " << ev->Get()->Print(false)
<< " result# " << result->Print(false)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp
index dfbe247be85..8ddf88414f2 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp
@@ -90,7 +90,7 @@ public:
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_STATUS, false, {}, now, storagePoolCounters,
- ev->RestartCounter, "DSProxy.Status")
+ ev->RestartCounter, "DSProxy.Status", std::move(ev->ExecutionRelay))
, Deadline(ev->Deadline)
, Requests(0)
, Responses(0)
diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
index b9404a73b12..4917dad2724 100644
--- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
+++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
@@ -15,43 +15,50 @@ namespace NKikimr {
void Handle(TEvBlobStorage::TEvPut::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM01, "TEvPut", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, Model->Handle(ev->Get()),0, ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvGet::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM02, "TEvGet", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvBlock::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM03, "TEvBlock", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvDiscover::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM04, "TEvDiscover", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvRange::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM05, "TEvRange", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvCollectGarbage::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM06, "TEvCollectGarbage", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, Model->Handle(ev->Get()), 0, ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), Model->Handle(ev->Get())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvStatus::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM07, "TEvStatus", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, new TEvBlobStorage::TEvStatusResult(NKikimrProto::OK, Model->GetStorageStatusFlags()), 0,
- ev->Cookie);
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), new TEvBlobStorage::TEvStatusResult(NKikimrProto::OK,
+ Model->GetStorageStatusFlags())), 0, ev->Cookie);
}
void Handle(TEvBlobStorage::TEvAssimilate::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM09, "TEvAssimilate", (Msg, ev->Get()->ToString()));
- Send(ev->Sender, new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::ERROR, "not implemented"));
+ Send(ev->Sender, CopyExecutionRelay(ev->Get(), new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::ERROR,
+ "not implemented")), 0, ev->Cookie);
+ }
+
+ template<typename TOut, typename TIn>
+ TOut *CopyExecutionRelay(TIn *in, TOut *out) {
+ out->ExecutionRelay = std::move(in->ExecutionRelay);
+ return out;
}
void HandlePoison(TEvents::TEvPoisonPill::TPtr& ev) {
diff --git a/ydb/core/mind/bscontroller/virtual_group.cpp b/ydb/core/mind/bscontroller/virtual_group.cpp
index 5e25b1d46b2..80de996e32f 100644
--- a/ydb/core/mind/bscontroller/virtual_group.cpp
+++ b/ydb/core/mind/bscontroller/virtual_group.cpp
@@ -338,8 +338,10 @@ namespace NKikimr::NBsController {
void OnPipeError(TActorId clientId) {
if (clientId == HivePipeId) {
HivePipeId = {};
+ Bootstrap();
} else if (clientId == BlobDepotPipeId) {
BlobDepotPipeId = {};
+ Bootstrap();
}
}
@@ -371,13 +373,12 @@ namespace NKikimr::NBsController {
}
NTabletPipe::CloseAndForgetClient(SelfId(), HivePipeId);
+ ConfigureBlobDepot();
}
void Handle(TEvHive::TEvTabletCreationResult::TPtr ev) {
STLOG(PRI_INFO, BS_CONTROLLER, BSCVG05, "received TEvTabletCreationResult", (TabletId, Self->TabletID()),
(Msg, ev->Get()->Record));
-
- ConfigureBlobDepot();
}
void ConfigureBlobDepot() {