summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2022-08-08 14:13:50 +0300
committeralexvru <[email protected]>2022-08-08 14:13:50 +0300
commit34d70b4d5019c91fed33d3aab2cdda46b61dd734 (patch)
tree9f25bdec505dcceca0ce9ca194501662b9eebd86
parent203ba5ba0339b44f13dea1b1d03058a541e3d6e7 (diff)
Support group decommission machinery
-rw-r--r--ydb/core/blob_depot/agent.cpp21
-rw-r--r--ydb/core/blob_depot/agent/agent.cpp23
-rw-r--r--ydb/core/blob_depot/agent/agent.h2
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h34
-rw-r--r--ydb/core/blob_depot/agent/blob_mapping_cache.cpp3
-rw-r--r--ydb/core/blob_depot/agent/blob_mapping_cache.h4
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp1
-rw-r--r--ydb/core/blob_depot/agent/proxy.cpp6
-rw-r--r--ydb/core/blob_depot/agent/query.cpp4
-rw-r--r--ydb/core/blob_depot/agent/read.cpp36
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp6
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp2
-rw-r--r--ydb/core/blob_depot/assimilator.cpp39
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h8
-rw-r--r--ydb/core/blob_depot/data.cpp4
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp14
-rw-r--r--ydb/core/blob_depot/op_apply_config.cpp8
-rw-r--r--ydb/core/blob_depot/op_load.cpp10
-rw-r--r--ydb/core/blob_depot/types.h7
-rw-r--r--ydb/core/blobstorage/base/blobstorage_events.h8
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h38
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.h11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp14
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_request.cpp8
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp10
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_group.cpp25
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h3
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_mon.cpp4
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp3
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp58
-rw-r--r--ydb/core/protos/blob_depot.proto8
40 files changed, 286 insertions, 155 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index cc731030d48..449b76fb457 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -13,6 +13,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()),
(PipeServerId, ev->Get()->ServerId));
+
const auto it = PipeServerToNode.find(ev->Get()->ServerId);
Y_VERIFY(it != PipeServerToNode.end());
if (const auto& nodeId = it->second) {
@@ -27,6 +28,8 @@ namespace NKikimr::NBlobDepot {
}
}
PipeServerToNode.erase(it);
+
+ RegisterAgentQ.erase(ev->Get()->ServerId);
}
void TBlobDepot::OnAgentDisconnect(TAgent& agent) {
@@ -35,9 +38,16 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) {
+ if (!Configured || (Config.HasDecommitGroupId() && !DecommitBlocksFinished)) {
+ const auto [it, inserted] = RegisterAgentQ.emplace(ev->Recipient, ev.Release());
+ Y_VERIFY(inserted);
+ return;
+ }
+
const ui32 nodeId = ev->Sender.NodeId();
const TActorId& pipeServerId = ev->Recipient;
const auto& req = ev->Get()->Record;
+
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId),
(PipeServerId, pipeServerId), (Id, ev->Cookie));
@@ -267,4 +277,15 @@ namespace NKikimr::NBlobDepot {
Execute(std::make_unique<TTxInvokeCallback>(this, ev));
}
+ void TBlobDepot::ProcessRegisterAgentQ() {
+ if (!Configured || (Config.HasDecommitGroupId() && !DecommitBlocksFinished)) {
+ return;
+ }
+
+ for (auto& [pipeServerId, ev] : std::exchange(RegisterAgentQ, {})) {
+ TAutoPtr<IEventHandle> tmp(ev.release());
+ Receive(tmp, TActivationContext::AsActorContext());
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp
index c4f56e58a02..b16abfc0dc9 100644
--- a/ydb/core/blob_depot/agent/agent.cpp
+++ b/ydb/core/blob_depot/agent/agent.cpp
@@ -5,24 +5,35 @@
namespace NKikimr::NBlobDepot {
- TBlobDepotAgent::TBlobDepotAgent(ui32 virtualGroupId)
- : TActor(&TThis::StateFunc)
- , TRequestSender(*this)
+ TBlobDepotAgent::TBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId)
+ : TRequestSender(*this)
, VirtualGroupId(virtualGroupId)
+ , ProxyId(proxyId)
, AgentInstanceId(RandomNumber<ui64>())
, BlocksManagerPtr(new TBlocksManager(*this))
, BlocksManager(*BlocksManagerPtr)
, BlobMappingCachePtr(new TBlobMappingCache(*this))
, BlobMappingCache(*BlobMappingCachePtr)
{
- Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual);
+ if (info) {
+ Y_VERIFY(info->BlobDepotId);
+ TabletId = *info->BlobDepotId;
+ }
}
TBlobDepotAgent::~TBlobDepotAgent()
{}
- IActor *CreateBlobDepotAgent(ui32 virtualGroupId) {
- return new TBlobDepotAgent(virtualGroupId);
+ void TBlobDepotAgent::Bootstrap() {
+ Become(&TThis::StateFunc);
+
+ if (TabletId) {
+ ConnectToBlobDepot();
+ }
+ }
+
+ IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) {
+ return new TBlobDepotAgent(virtualGroupId, std::move(info), proxyId);
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent.h b/ydb/core/blob_depot/agent/agent.h
index 4d66f9967b4..3eb49f8f5bb 100644
--- a/ydb/core/blob_depot/agent/agent.h
+++ b/ydb/core/blob_depot/agent/agent.h
@@ -4,6 +4,6 @@
namespace NKikimr::NBlobDepot {
- IActor *CreateBlobDepotAgent(ui32 virtualGroupId);
+ IActor *CreateBlobDepotAgent(ui32 groupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId);
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 28fc2b71179..25e1b937020 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -30,10 +30,8 @@ namespace NKikimr::NBlobDepot {
using TPtr = std::shared_ptr<TRequestContext>;
};
- using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
-
struct TTabletDisconnected {};
- struct TKeyResolved { const TValueChain* ValueChain; };
+ struct TKeyResolved { const TResolvedValueChain* ValueChain; };
class TRequestSender {
THashMap<ui64, TRequestContext::TPtr> RequestsInFlight;
@@ -74,18 +72,21 @@ namespace NKikimr::NBlobDepot {
};
class TBlobDepotAgent
- : public TActor<TBlobDepotAgent>
+ : public TActorBootstrapped<TBlobDepotAgent>
, public TRequestSender
{
const ui32 VirtualGroupId;
+ const TActorId ProxyId;
const ui64 AgentInstanceId;
ui64 TabletId = Max<ui64>();
TActorId PipeId;
public:
- TBlobDepotAgent(ui32 virtualGroupId);
+ TBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId);
~TBlobDepotAgent();
+ void Bootstrap();
+
#define FORWARD_STORAGE_PROXY(TYPE) fFunc(TEvBlobStorage::TYPE, HandleStorageProxy);
STRICT_STFUNC(StateFunc,
cFunc(TEvents::TSystem::Poison, PassAway);
@@ -108,6 +109,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobStorage::TEvPutResult, HandleOtherResponse);
ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY)
+ hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle);
);
#undef FORWARD_STORAGE_PROXY
@@ -120,14 +122,18 @@ namespace NKikimr::NBlobDepot {
const auto& info = ev->Get()->Info;
Y_VERIFY(info);
Y_VERIFY(info->BlobDepotId);
- TabletId = *info->BlobDepotId;
- if (TabletId) {
- ConnectToBlobDepot();
- }
+ if (TabletId != *info->BlobDepotId) {
+ TabletId = *info->BlobDepotId;
+ if (TabletId) {
+ ConnectToBlobDepot();
+ }
- for (auto& ev : std::exchange(PendingEventQ, {})) {
- TActivationContext::Send(ev.release());
+ for (auto& ev : std::exchange(PendingEventQ, {})) {
+ TActivationContext::Send(ev.release());
+ }
}
+
+ TActivationContext::Send(ev->Forward(ProxyId));
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -168,6 +174,7 @@ namespace NKikimr::NBlobDepot {
};
ui32 BlobDepotGeneration = 0;
+ std::optional<ui32> DecommitGroupId;
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
@@ -233,6 +240,7 @@ namespace NKikimr::NBlobDepot {
TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries;
void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
+ void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
TQuery *CreateQuery(TAutoPtr<IEventHandle> ev);
template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev);
@@ -293,8 +301,8 @@ namespace NKikimr::NBlobDepot {
struct TReadContext;
- bool IssueRead(const TValueChain& values, ui64 offset, ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass,
- bool mustRestoreFirst, TQuery *query, ui64 tag, bool vg, TString *error);
+ bool IssueRead(const TResolvedValueChain& values, ui64 offset, ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass,
+ bool mustRestoreFirst, TQuery *query, ui64 tag, TString *error);
void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg);
diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
index 479cb057bdd..71b1d822d82 100644
--- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
+++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
@@ -21,7 +21,8 @@ namespace NKikimr::NBlobDepot {
}
}
- const TValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context) {
+ const TResolvedValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query,
+ TRequestContext::TPtr context) {
const auto [it, inserted] = Cache.try_emplace(std::move(key));
auto& entry = it->second;
if (inserted) {
diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h
index d4e44e7c74f..42d124b2bd0 100644
--- a/ydb/core/blob_depot/agent/blob_mapping_cache.h
+++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h
@@ -20,7 +20,7 @@ namespace NKikimr::NBlobDepot {
struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> {
TStringBuf Key;
- std::optional<TValueChain> Values;
+ std::optional<TResolvedValueChain> Values;
bool ResolveInFlight = false;
std::list<TQueryWaitingForKey> QueriesWaitingForKey;
};
@@ -34,7 +34,7 @@ namespace NKikimr::NBlobDepot {
{}
void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg);
- const TValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context);
+ const TResolvedValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context);
void ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response);
};
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 0028a3690fc..91ccc11fd66 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -29,6 +29,7 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId),
(Msg, msg));
BlobDepotGeneration = msg.GetGeneration();
+ DecommitGroupId = msg.HasDecommitGroupId() ? std::make_optional(msg.GetDecommitGroupId()) : std::nullopt;
THashSet<NKikimrBlobDepot::TChannelKind::E> vanishedKinds;
for (const auto& [kind, _] : ChannelKinds) {
diff --git a/ydb/core/blob_depot/agent/proxy.cpp b/ydb/core/blob_depot/agent/proxy.cpp
index c4e9e8b113b..e1eb120c82c 100644
--- a/ydb/core/blob_depot/agent/proxy.cpp
+++ b/ydb/core/blob_depot/agent/proxy.cpp
@@ -5,7 +5,11 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::SendToProxy(ui32 groupId, std::unique_ptr<IEventBase> event, TRequestSender *sender,
TRequestContext::TPtr context) {
const ui64 id = NextRequestId++;
- SendToBSProxy(SelfId(), groupId, event.release(), id);
+ if (groupId == DecommitGroupId) {
+ Send(ProxyId, event.release(), 0, id);
+ } else {
+ SendToBSProxy(SelfId(), groupId, event.release(), id);
+ }
RegisterRequest(id, sender, std::move(context), {}, false);
}
diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp
index cf6587c1f77..6870cae93f2 100644
--- a/ydb/core/blob_depot/agent/query.cpp
+++ b/ydb/core/blob_depot/agent/query.cpp
@@ -18,6 +18,10 @@ namespace NKikimr::NBlobDepot {
}
}
+ void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) {
+ ev->Get()->Process(this);
+ }
+
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) {
switch (ev->GetTypeRewrite()) {
#define XX(TYPE) \
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index 1d61329ae3d..6849e254f40 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -26,9 +26,9 @@ namespace NKikimr::NBlobDepot {
}
};
- bool TBlobDepotAgent::IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>& values, ui64 offset,
- ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, TQuery *query,
- ui64 tag, bool vg, TString *error) {
+ bool TBlobDepotAgent::IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>& values,
+ ui64 offset, ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst,
+ TQuery *query, ui64 tag, TString *error) {
ui64 outputOffset = 0;
struct TReadItem {
@@ -41,19 +41,12 @@ namespace NKikimr::NBlobDepot {
std::vector<TReadItem> items;
for (const auto& value : values) {
- if (!value.HasLocator()) {
- *error = "TValueChain.Locator is missing";
- return false;
- }
- const auto& locator = value.GetLocator();
- const ui64 totalDataLen = locator.GetTotalDataLen();
- if (!totalDataLen) {
- *error = "TBlobLocator.TotalDataLen is missing or zero";
- return false;
- }
+ const ui32 groupId = value.GetGroupId();
+ const auto blobId = LogoBlobIDFromLogoBlobID(value.GetBlobId());
const ui64 begin = value.GetSubrangeBegin();
- const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : totalDataLen;
- if (end <= begin || totalDataLen < end) {
+ const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : blobId.BlobSize();
+
+ if (end <= begin || blobId.BlobSize() < end) {
*error = "incorrect SubrangeBegin/SubrangeEnd pair";
return false;
}
@@ -70,18 +63,7 @@ namespace NKikimr::NBlobDepot {
partLen = Min(size ? size : Max<ui64>(), partLen - offset);
Y_VERIFY(partLen);
- auto blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
-
- if (vg) {
- const bool composite = totalDataLen + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize;
- const EBlobType type = composite ? EBlobType::VG_COMPOSITE_BLOB : EBlobType::VG_DATA_BLOB;
- const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0);
- const auto id = blobSeqId.MakeBlobId(TabletId, type, 0, blobSize);
- items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin),
- static_cast<ui32>(partLen), outputOffset});
- } else {
- Y_FAIL();
- }
+ items.push_back(TReadItem{groupId, blobId, ui32(offset + begin), ui32(partLen), outputOffset});
outputOffset += partLen;
offset = 0;
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index c50a8a57758..51cbaa05c9b 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -108,7 +108,7 @@ namespace NKikimr::NBlobDepot {
if (ReadBody) {
TString error;
if (!Agent.IssueRead(item.GetValueChain(), 0, 0, NKikimrBlobStorage::Discover, true, this, 0,
- true, &error)) {
+ &error)) {
return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: "
<< error);
}
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index f710bbfe0f7..f7ff27234d3 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -36,7 +36,7 @@ namespace NKikimr::NBlobDepot {
response.RequestedSize = query.Size;
TString blobId = query.Id.AsBinaryString();
- if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
+ if (const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
std::make_shared<TResolveKeyContext>(i))) {
if (!ProcessSingleResult(i, value)) {
return;
@@ -45,7 +45,7 @@ namespace NKikimr::NBlobDepot {
}
}
- bool ProcessSingleResult(ui32 queryIdx, const TValueChain *value) {
+ bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value) {
auto& msg = GetQuery();
if (!value) {
@@ -57,7 +57,7 @@ namespace NKikimr::NBlobDepot {
} else if (value) {
TString error;
const bool success = Agent.IssueRead(*value, msg.Queries[queryIdx].Shift, msg.Queries[queryIdx].Size,
- msg.GetHandleClass, msg.MustRestoreFirst, this, queryIdx, true, &error);
+ msg.GetHandleClass, msg.MustRestoreFirst, this, queryIdx, &error);
if (!success) {
EndWithError(NKikimrProto::ERROR, std::move(error));
return false;
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index af054706490..670960401ea 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -97,7 +97,7 @@ namespace NKikimr::NBlobDepot {
if (!query.IsIndexOnly) {
TString error;
if (!Agent.IssueRead(key.GetValueChain(), 0, 0, NKikimrBlobStorage::EGetHandleClass::FastRead,
- query.MustRestoreFirst, this, index, true, &error)) {
+ query.MustRestoreFirst, this, index, &error)) {
return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: "
<< error);
}
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index c871995c407..2353d042cd5 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -1,5 +1,7 @@
#include "blob_depot_tablet.h"
#include "assimilator_fetch_machine.h"
+#include "schema.h"
+#include "blocks.h"
namespace NKikimr::NBlobDepot {
@@ -77,7 +79,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "TGroupAssimilator::TEvControllerNodeServiceSetUpdate", (GroupId, GroupId),
- (Msg, ev->Get()->ToString()));
+ (Msg, ev->Get()->Record));
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId);
auto& record = ev->Get()->Record;
@@ -152,7 +154,40 @@ namespace NKikimr::NBlobDepot {
}
}
- void TBlobDepot::Handle(TEvAssimilatedData::TPtr /*ev*/) {
+ void TBlobDepot::Handle(TEvAssimilatedData::TPtr ev) {
+ class TTxPutAssimilatedData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::unique_ptr<TEvAssimilatedData> Ev;
+
+ public:
+ TTxPutAssimilatedData(TBlobDepot *self, TEvAssimilatedData::TPtr ev)
+ : TTransactionBase(self)
+ , Ev(ev->Release().Release())
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ for (const auto& block : Ev->Blocks) {
+ Self->BlocksManager->AddBlockOnLoad(block.TabletId, block.BlockedGeneration, 0);
+ db.Table<Schema::Blocks>().Key(block.TabletId).Update(
+ NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(block.BlockedGeneration),
+ NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(0)
+ );
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ if (!std::exchange(Self->DecommitBlocksFinished, Ev->BlocksFinished)) {
+ STLOG(PRI_INFO, BLOB_DEPOT, BDTxx, "blocks assimilation complete", (TabletId, Self->TabletID()),
+ (DecommitGroupId, Self->Config.GetDecommitGroupId()));
+ Self->ProcessRegisterAgentQ();
+ }
+ }
+ };
+
+ Execute(std::make_unique<TTxPutAssimilatedData>(this, ev));
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 10586eb63b8..3e6e66f77ec 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -153,6 +153,9 @@ namespace NKikimr::NBlobDepot {
static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1);
+ // when in decommission mode and not all blocks are yet recovered, then we postpone agent registration
+ THashMap<TActorId, std::unique_ptr<TEvBlobDepot::TEvRegisterAgent::THandle>> RegisterAgentQ;
+
struct TAgent {
std::optional<TActorId> PipeServerId;
std::optional<TActorId> AgentId;
@@ -198,6 +201,8 @@ namespace NKikimr::NBlobDepot {
void ResetAgent(TAgent& agent);
void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev);
+ void ProcessRegisterAgentQ();
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override {
@@ -220,6 +225,7 @@ namespace NKikimr::NBlobDepot {
void StartOperation() {
InitChannelKinds();
StartGroupAssimilator();
+ ProcessRegisterAgentQ();
}
void OnDetach(const TActorContext&) override {
@@ -272,6 +278,7 @@ namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Configuration
+ bool Configured = false;
NKikimrBlobDepot::TBlobDepotConfig Config;
void Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev);
@@ -309,6 +316,7 @@ namespace NKikimr::NBlobDepot {
// Group assimilation
TActorId RunningGroupAssimilator;
+ bool DecommitBlocksFinished = false;
class TGroupAssimilator;
class TGroupAssimilatorFetchMachine;
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 06e335af3af..96ce5a4971d 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -42,7 +42,7 @@ namespace NKikimr::NBlobDepot {
void TData::PutKey(TKey key, TValue&& data) {
ui64 referencedBytes = 0;
- EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) {
+ EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) {
if (!RefCount[id]++) {
// first mention of this id
auto& record = GetRecordsPerChannelGroup(id);
@@ -78,7 +78,7 @@ namespace NKikimr::NBlobDepot {
const auto it = Data.find(key);
Y_VERIFY(it != Data.end());
TValue& value = it->second;
- EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id) {
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) {
const auto it = RefCount.find(id);
Y_VERIFY(it != RefCount.end());
if (!--it->second) {
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index bdf96bb06b8..5f106549a32 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -221,7 +221,19 @@ namespace NKikimr::NBlobDepot {
item.SetCookie(*cookie);
}
item.SetKey(key.MakeBinaryKey());
- item.MutableValueChain()->CopyFrom(value.ValueChain);
+ EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) {
+ if (begin != end) {
+ auto *out = item.AddValueChain();
+ out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation()));
+ LogoBlobIDFromLogoBlobID(id, out->MutableBlobId());
+ if (begin) {
+ out->SetSubrangeBegin(begin);
+ }
+ if (end != id.BlobSize()) {
+ out->SetSubrangeEnd(end);
+ }
+ }
+ });
if (value.Meta) {
item.SetMeta(value.Meta.data(), value.Meta.size());
}
diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp
index 7c6f1353ac2..ffdd206091e 100644
--- a/ydb/core/blob_depot/op_apply_config.cpp
+++ b/ydb/core/blob_depot/op_apply_config.cpp
@@ -9,7 +9,6 @@ namespace NKikimr::NBlobDepot {
class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<IEventHandle> Response;
TString ConfigProtobuf;
- bool WasConfigured = false;
public:
TTxApplyConfig(TBlobDepot *self, TEvBlobDepot::TEvApplyConfig& ev, std::unique_ptr<IEventHandle> response,
@@ -33,7 +32,6 @@ namespace NKikimr::NBlobDepot {
if (!table.IsReady()) {
return false;
}
- WasConfigured = table.IsValid() && table.HaveValue<Schema::Config::ConfigProtobuf>();
db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf)
@@ -46,12 +44,12 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT17, "TTxApplyConfig::Complete", (TabletId, Self->TabletID()),
- (WasConfigured, WasConfigured));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT17, "TTxApplyConfig::Complete", (TabletId, Self->TabletID()));
- if (!WasConfigured) {
+ if (!std::exchange(Self->Configured, true)) {
Self->StartOperation();
}
+
TActivationContext::Send(Response.release());
}
};
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index d3947b5ab91..7812213678a 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -8,8 +8,6 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::ExecuteTxLoad() {
class TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- bool Configured = false;
-
public:
TTxLoad(TBlobDepot *self)
: TTransactionBase(self)
@@ -31,8 +29,8 @@ namespace NKikimr::NBlobDepot {
return false;
} else if (table.IsValid()) {
if (table.HaveValue<Schema::Config::ConfigProtobuf>()) {
- Configured = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>());
- Y_VERIFY(Configured);
+ Self->Configured = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>());
+ Y_VERIFY(Self->Configured);
}
}
}
@@ -87,9 +85,9 @@ namespace NKikimr::NBlobDepot {
void Complete(const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "TTxLoad::Complete", (TabletId, Self->TabletID()),
- (Configured, Configured));
+ (Configured, Self->Configured));
- if (Configured) {
+ if (Self->Configured) {
Self->StartOperation();
}
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 7e6fd06c1b8..fcb16cebec0 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -158,6 +158,7 @@ namespace NKikimr::NBlobDepot {
};
using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
+ using TResolvedValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>;
template<typename TCallback>
void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) {
@@ -165,11 +166,11 @@ namespace NKikimr::NBlobDepot {
const auto& locator = item.GetLocator();
const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) {
- callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()));
- callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()));
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen());
+ callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0);
} else {
callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() +
- locator.GetFooterLen()));
+ locator.GetFooterLen()), 0, locator.GetTotalDataLen());
}
}
}
diff --git a/ydb/core/blobstorage/base/blobstorage_events.h b/ydb/core/blobstorage/base/blobstorage_events.h
index f2f6a6df620..bd36342fbca 100644
--- a/ydb/core/blobstorage/base/blobstorage_events.h
+++ b/ydb/core/blobstorage/base/blobstorage_events.h
@@ -417,6 +417,14 @@ namespace NKikimr {
struct TEvBlobStorage::TEvBunchOfEvents : TEventLocal<TEvBunchOfEvents, EvBunchOfEvents> {
std::vector<std::unique_ptr<IEventHandle>> Bunch;
+
+ void Process(IActor *actor) {
+ const TActorContext& ctx = TActivationContext::AsActorContext();
+ for (auto& ev : Bunch) {
+ TAutoPtr<IEventHandle> handle(ev.release());
+ actor->Receive(handle, ctx);
+ }
+ }
};
struct TEvBlobStorage::TEvAskRestartPDisk : TEventLocal<TEvAskRestartPDisk, EvAskRestartPDisk> {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index 78e7039aa67..5ceac59dec0 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -132,7 +132,7 @@ NActors::NLog::EPriority PriorityForStatusResult(NKikimrProto::EReplyStatus stat
NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus status);
template<typename TDerived>
-class TBlobStorageGroupRequestActor : public TActorBootstrapped<TDerived> {
+class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor<TDerived>> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BS_GROUP_REQUEST;
@@ -142,7 +142,8 @@ public:
TIntrusivePtr<TBlobStorageGroupProxyMon> mon, const TActorId& source, ui64 cookie, NWilson::TTraceId traceId,
NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled, TMaybe<TGroupStat::EKind> latencyQueueKind,
TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name)
- : Info(std::move(info))
+ : TActor<TBlobStorageGroupRequestActor<TDerived>>(&TThis::InitialStateFunc, Derived().ActorActivityType())
+ , Info(std::move(info))
, GroupQueues(std::move(groupQueues))
, Mon(std::move(mon))
, PoolCounters(storagePoolCounters)
@@ -161,6 +162,17 @@ public:
.Attribute("RestartCounter", RestartCounter);
}
+ void Registered(TActorSystem *as, const TActorId& parentId) override {
+ ProxyActorId = parentId;
+ as->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0,
+ TActor<TBlobStorageGroupRequestActor<TDerived>>::SelfId(), parentId, nullptr, 0));
+ TActor<TBlobStorageGroupRequestActor<TDerived>>::Registered(as, parentId);
+ }
+
+ STRICT_STFUNC(InitialStateFunc,
+ cFunc(TEvents::TSystem::Bootstrap, Derived().Bootstrap);
+ )
+
template<typename T>
void CountEvent(const T &ev) const {
ERequestType request = TDerived::RequestType();
@@ -258,10 +270,10 @@ public:
}
// make NodeWarden restart the query just after proxy reconfiguration
- const TActorId& proxyId = GetProxyActorId();
Y_VERIFY_DEBUG(RestartCounter < 100);
auto q = self.RestartQuery(RestartCounter + 1);
++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)];
+ const TActorId& proxyId = MakeBlobStorageProxyID(Info->GroupID);
TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span.GetTraceId()));
PassAway();
return true;
@@ -386,6 +398,10 @@ public:
}
}
+ void SendToProxy(std::unique_ptr<IEventBase> event, ui64 cookie = 0, NWilson::TTraceId traceId = {}) {
+ Derived().Send(ProxyActorId, event.release(), 0, cookie, std::move(traceId));
+ }
+
void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source,
ui64 cookie) {
SendResponse(std::move(ev), timeStats, source, cookie);
@@ -396,15 +412,11 @@ public:
SendResponseAndDie(std::move(ev), timeStats, Source, Cookie);
}
- TActorId GetProxyActorId() const {
- return MakeBlobStorageProxyID(Info->GroupID);
- }
-
void PassAway() override {
// ensure that we are dying for the first time
Y_VERIFY(!std::exchange(Dead, true));
TDerived::ActiveCounter(Mon)->Dec();
- Derived().Send(GetProxyActorId(), new TEvDeathNote(Responsiveness));
+ SendToProxy(std::make_unique<TEvDeathNote>(Responsiveness));
TActorBootstrapped<TDerived>::PassAway();
}
@@ -445,13 +457,12 @@ public:
RequestBytes, GeneratedSubrequests, GeneratedSubrequestBytes, Timer.Passed());
}
- const TActorId proxyId = GetProxyActorId();
if (timeStats) {
- Derived().Send(proxyId, new TEvTimeStats(std::move(*timeStats)));
+ SendToProxy(std::make_unique<TEvTimeStats>(std::move(*timeStats)));
}
if (LatencyQueueKind) {
- Derived().Send(proxyId, new TEvLatencyReport(*LatencyQueueKind, now - RequestStartTime));
+ SendToProxy(std::make_unique<TEvLatencyReport>(*LatencyQueueKind, now - RequestStartTime));
}
// KIKIMR-6737
@@ -501,6 +512,8 @@ private:
}
protected:
+ using TThis = TDerived;
+
TIntrusivePtr<TBlobStorageGroupInfo> Info;
TIntrusivePtr<TGroupQueues> GroupQueues;
TIntrusivePtr<TBlobStorageGroupProxyMon> Mon;
@@ -527,6 +540,7 @@ private:
THPTimer Timer;
std::deque<std::unique_ptr<IEventHandle>> PostponedQ;
TBlobStorageGroupInfo::TGroupFailDomains RacingDomains; // a set of domains we've received RACE from
+ TActorId ProxyActorId;
};
void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id,
@@ -568,7 +582,7 @@ IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroup
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvPatch *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
- const TActorId &proxyId, bool useVPatch);
+ bool useVPatch);
IActor* CreateBlobStorageGroupMultiGetRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
index 16a4730e535..3cded7156d5 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
@@ -602,8 +602,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
getRequest->IsInternal = true;
getRequest->TabletId = TabletId;
getRequest->AcquireBlockedGeneration = true;
- bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span.GetTraceId());
- Y_VERIFY(isSent);
+ SendToProxy(std::move(getRequest), 0, Span.GetTraceId());
TotalSent++;
A_LOG_DEBUG_S("BSD10", "Sent EvGet logoBlobId# " << logoBlobId.ToString());
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
index 590a92bdf0f..7aa610589a0 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
@@ -570,7 +570,7 @@ public:
A_LOG_DEBUG_S("DSPDM17", "sending TEvGet# " << query->ToString());
- SendToBSProxy(SelfId(), Info->GroupID, query.release());
+ SendToProxy(std::move(query));
++RequestsInFlight;
} else {
GetFinished = true;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
index 5fdc6c30d90..9ab9a472051 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
@@ -286,7 +286,7 @@ public:
// we have to process this blob
auto query = std::make_unique<TEvBlobStorage::TEvGet>(id, 0, 0, Deadline, HandleClass, true, !ReadBody, ForceBlockedGeneration);
query->IsInternal = true;
- SendToBSProxy(SelfId(), Info->GroupID, query.release());
+ SendToProxy(std::move(query));
GetIssuedFor = id;
return;
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
index fd4b5e5b5e4..e03405a42c3 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
@@ -258,15 +258,6 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
void Handle(TEvDeathNote::TPtr ev);
- template<typename TEvent>
- void HandleCheckAssimilator(TAutoPtr<TEventHandle<TEvent>>& ev) {
- if (/*Info->AssimilatorGroupId*/ false) {
- //TActivationContext::Send(ev->Forward(MakeBlobStorageProxyID(*Info->AssimilatorGroupId)));
- } else {
- return HandleNormal(ev);
- }
- }
-
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Error state
@@ -405,7 +396,7 @@ public:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
- HANDLE_EVENTS(HandleCheckAssimilator);
+ HANDLE_EVENTS(HandleNormal);
default: return StateCommon(ev, ctx);
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
index 1b4868cf265..f3be4f5dd48 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
@@ -168,7 +168,7 @@ class TBlobStorageGroupIndexRestoreGetRequest
<< " recoverable blob, id# " << Queries[idx].Id.ToString()
<< " BlobStatus# " << DumpBlobStatus(idx)
<< " sending EvGet");
- SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0);
+ SendToProxy(std::move(get));
RestoreQueriesStarted++;
} else if (blobState == TBlobStorageGroupInfo::EBS_FULL) {
a.Status = NKikimrProto::OK;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
index fa59aff5c1e..d338291c6f9 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
@@ -155,7 +155,7 @@ public:
R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx
<< " isLast# " << isLast
<< " ev# " << ev->ToString());
- SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId());
+ SendToProxy(std::move(ev), cookie, Span.GetTraceId());
if (isLast) {
CollectRequestsInFlight++;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
index 7b73f153a35..9b2f53f4c09 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
@@ -122,7 +122,7 @@ public:
void SendRequests() {
for (; RequestsInFlight < MaxRequestsInFlight && !PendingGets.empty(); ++RequestsInFlight, PendingGets.pop_front()) {
auto& [ev, cookie] = PendingGets.front();
- SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId());
+ SendToProxy(std::move(ev), cookie, Span.GetTraceId());
}
if (!RequestsInFlight && PendingGets.empty()) {
auto ev = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, 0, Info->GroupID);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
index 9dbba7dd853..d0c5842904a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
@@ -33,7 +33,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
static constexpr ui32 TypicalMaxPartsCount = TypicalPartPlacementCount * TypicalPartsInBlob;
TString Buffer;
- TActorId ProxyActorId;
ui32 OriginalGroupId;
TLogoBlobID OriginalId;
@@ -90,11 +89,10 @@ public:
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvPatch *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
- const TActorId &proxyId, bool useVPatch = false)
+ bool useVPatch = false)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_PATCH, false, {}, now, storagePoolCounters,
ev->RestartCounter, "DSProxy.Patch")
- , ProxyActorId(proxyId)
, OriginalGroupId(ev->OriginalGroupId)
, OriginalId(ev->OriginalId)
, PatchedId(ev->PatchedId)
@@ -172,7 +170,7 @@ public:
std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline,
NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault);
put->Orbit = std::move(Orbit);
- Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span.GetTraceId());
+ SendToProxy(std::move(put), OriginalId.Hash(), Span.GetTraceId());
}
void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev) {
@@ -528,7 +526,7 @@ public:
NKikimrBlobStorage::AsyncRead);
get->Orbit = std::move(Orbit);
if (OriginalGroupId == Info->GroupID) {
- Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span.GetTraceId());
+ SendToProxy(std::move(get), PatchedId.Hash(), Span.GetTraceId());
} else {
SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span.GetTraceId());
}
@@ -824,9 +822,9 @@ IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroup
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvPatch *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
- const TActorId &proxyId, bool useVPatch) {
- return new TBlobStorageGroupPatchRequest(info, state, source, mon, ev, cookie, std::move(traceId),
- now, storagePoolCounters, proxyId, useVPatch);
+ bool useVPatch) {
+ return new TBlobStorageGroupPatchRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
+ storagePoolCounters, useVPatch);
}
}//NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index c83f46b1080..a0e20065d1f 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -250,7 +250,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
A_LOG_DEBUG_S("DSR08", "sending TEvGet# " << get->ToString());
- SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span.GetTraceId());
+ SendToProxy(std::move(get), 0, Span.GetTraceId());
// switch state
Become(&TThis::StateGet);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
index 3ef82b2c926..72a6b091049 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
@@ -161,7 +161,7 @@ namespace NKikimr {
const TActorId reqId = Register(
CreateBlobStorageGroupPatchRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
ev->Get(), ev->Cookie, std::move(ev->TraceId), now,
- StoragePoolCounters, SelfId(), EnableVPatch.Update(now)));
+ StoragePoolCounters, EnableVPatch.Update(now)));
ActiveRequests.insert(reqId);
}
@@ -243,11 +243,7 @@ namespace NKikimr {
}
void TBlobStorageGroupProxy::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) {
- const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId());
- for (auto& ev : ev->Get()->Bunch) {
- TAutoPtr<IEventHandle> handle(ev.release());
- Receive(handle, ctx);
- }
+ ev->Get()->Process(this);
}
void TBlobStorageGroupProxy::ProcessBatchedPutRequests(TBatchedQueue<TEvBlobStorage::TEvPut::TPtr> &batchedPuts,
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h
index 2167005b968..3c532929f8a 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h
@@ -127,7 +127,7 @@ struct TDSProxyEnv {
std::unique_ptr<IActor> CreatePatchRequestActor(TEvBlobStorage::TEvPatch::TPtr &ev, bool useVPatch = false) {
return std::unique_ptr<IActor>(CreateBlobStorageGroupPatchRequest(Info, GroupQueues, ev->Sender, Mon,
ev->Get(), ev->Cookie, std::move(ev->TraceId), TInstant::Now(), StoragePoolCounters,
- FakeProxyActorId, useVPatch));
+ useVPatch));
}
};
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
index aaf73779b16..0e96d053298 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
@@ -530,7 +530,7 @@ void RunNaivePatchTest(TTestBasicRuntime &runtime, const TTestArgs &args, ENaive
env.Configure(runtime, args.GType, args.CurrentGroupId, 0);
TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args);
std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, false);
- runtime.Register(patchActor.release());
+ runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId);
ConductNaivePatch(runtime, args, naiveCase);
}
@@ -539,7 +539,7 @@ void RunMovedPatchTest(TTestBasicRuntime &runtime, const TTestArgs &args, EMoved
env.Configure(runtime, args.GType, args.CurrentGroupId, 0);
TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args);
std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true);
- runtime.Register(patchActor.release());
+ runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId);
ConductMovedPatch(runtime, env, args, movedCase);
}
@@ -548,7 +548,7 @@ void RunVPatchTest(TTestBasicRuntime &runtime, const TTestArgs &args, EVPatchCas
env.Configure(runtime, args.GType, args.CurrentGroupId, 0);
TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args);
std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true);
- runtime.Register(patchActor.release());
+ runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId);
ConductVPatch(runtime, env, args, vpatchCase);
}
@@ -742,7 +742,7 @@ void RunFaultToleranceBlock4Plus2(TTestBasicRuntime &runtime, const TTestArgs &a
env.Configure(runtime, args.GType, args.CurrentGroupId, 0);
TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args);
std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true);
- runtime.Register(patchActor.release());
+ runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId);
ConductFaultTolerance(runtime, env, args, GetFaultToleranceCaseForBlock4Plus2(env, args));
}
@@ -751,7 +751,7 @@ void RunFaultToleranceMirror3dc(TTestBasicRuntime &runtime, const TTestArgs &arg
env.Configure(runtime, args.GType, args.CurrentGroupId, 0);
TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args);
std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true);
- runtime.Register(patchActor.release());
+ runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId);
ConductFaultTolerance(runtime, env, args, GetFaultToleranceCaseForMirror3dc(env, args));
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
index cd2072cbc74..d3ca8aa7da9 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
@@ -189,8 +189,8 @@ namespace NKikimr::NStorage {
// for group/proxy and ask BSC for group info
group.Info.Reset();
RequestGroupConfig(groupId, group);
- if (group.ProxyRunning) {
- Send(MakeBlobStorageProxyID(groupId), new TEvBlobStorage::TEvConfigureProxy(nullptr));
+ if (group.ProxyId) {
+ Send(group.ProxyId, new TEvBlobStorage::TEvConfigureProxy(nullptr));
}
} else if (groupChanged) {
// group has changed; obtain main encryption key for this group and try to parse group info from the protobuf
@@ -202,13 +202,22 @@ namespace NKikimr::NStorage {
STLOG(PRI_ERROR, BS_NODE, NW19, "error while parsing group", (GroupId, groupId), (Err, s));
}
- if (group.ProxyRunning) { // update configuration for running proxies
+ if (group.ProxyId) { // update configuration for running proxies
auto info = NeedGroupInfo(groupId);
auto counters = info
? DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType())
: nullptr;
- Send(MakeBlobStorageProxyID(groupId), new TEvBlobStorage::TEvConfigureProxy(std::move(info),
- std::move(counters)));
+
+ if (info && info->BlobDepotId && !group.AgentProxy) {
+ // re-register proxy as an agent
+ group.AgentProxy = true;
+ TActorSystem *as = TActivationContext::ActorSystem();
+ group.ProxyId = Register(NBlobDepot::CreateBlobDepotAgent(groupId, info, group.ProxyId),
+ TMailboxType::ReadAsFilled, AppData()->SystemPoolId);
+ as->RegisterLocalService(MakeBlobStorageProxyID(groupId), group.ProxyId);
+ } else {
+ Send(group.ProxyId, new TEvBlobStorage::TEvConfigureProxy(std::move(info), std::move(counters)));
+ }
}
if (const auto& info = group.Info) {
@@ -238,9 +247,9 @@ namespace NKikimr::NStorage {
if (group.GetEntityStatus() == NKikimrBlobStorage::DESTROY) {
if (EjectedGroups.insert(groupId).second) {
TGroupRecord& group = Groups[groupId];
- STLOG(PRI_DEBUG, BS_NODE, NW99, "destroying group", (GroupId, groupId), (ProxyRunning, group.ProxyRunning));
- if (group.ProxyRunning) {
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, MakeBlobStorageProxyID(groupId), {}, nullptr, 0));
+ STLOG(PRI_DEBUG, BS_NODE, NW99, "destroying group", (GroupId, groupId), (ProxyId, group.ProxyId));
+ if (group.ProxyId) {
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, group.ProxyId, {}, nullptr, 0));
}
if (group.GroupResolver) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, group.GroupResolver, {}, nullptr, 0));
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 9b72c8dfdc1..53d8f837720 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -348,7 +348,8 @@ namespace NKikimr::NStorage {
ui32 MaxKnownGeneration = 0; // maximum seen generation
std::optional<NKikimrBlobStorage::TGroupInfo> Group; // group info as a protobuf
NKikimrBlobStorage::TGroupInfo EncryptionParams; // latest encryption parameters; set only when encryption enabled; overlay in respect to Group
- bool ProxyRunning = false;
+ TActorId ProxyId; // actor id of running DS proxy or agent
+ bool AgentProxy = false; // was the group started as an BlobDepot agent proxy?
bool GetGroupRequestPending = false; // if true, then we are waiting for GetGroup response for this group
bool ProposeRequestPending = false; // if true, then we have sent ProposeKey request and waiting for the group
TActorId GroupResolver; // resolver actor id
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
index 691a15f2ee0..8d29bae0dda 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp
@@ -58,7 +58,7 @@ void TNodeWarden::RenderJsonGroupInfo(IOutputStream& out, const std::set<ui32>&
groupInfo["Status"] = "ejected";
} else {
TGroupRecord& group = Groups[groupId];
- groupInfo["Status"] = group.ProxyRunning ? "started" : "stopped";
+ groupInfo["Status"] = group.ProxyId ? "started" : "stopped";
if (const auto& info = group.Info) {
groupInfo["Generation"] = info->GroupGeneration;
@@ -220,7 +220,7 @@ void TNodeWarden::RenderDSProxies(IOutputStream& out) {
ui32 numStarted = 0, numEjected = EjectedGroups.size();
for (const auto& [groupId, group] : Groups) {
- numStarted += group.ProxyRunning;
+ numStarted += bool(group.ProxyId);
}
H3() { out << "Started DSProxies"; }
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp
index e618cf2d553..37660519dbf 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp
@@ -63,7 +63,8 @@ void TNodeWarden::SendRegisterNode() {
TVector<ui32> startedDynamicGroups, generations;
for (const auto& [groupId, group] : Groups) {
- if (group.ProxyRunning && TGroupID(groupId).ConfigurationType() == EGroupConfigurationType::Dynamic) {
+ if (group.ProxyId && TGroupID(groupId).ConfigurationType() == EGroupConfigurationType::Dynamic &&
+ (!group.Info || group.Info->DecommitStatus != NKikimrBlobStorage::TGroupDecommitStatus::DONE)) {
startedDynamicGroups.push_back(groupId);
generations.push_back(group.Info ? group.Info->GroupGeneration : 0);
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp
index faea46d7334..f44a38d52a5 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp
@@ -12,36 +12,61 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) {
STLOG(PRI_DEBUG, BS_NODE, NW12, "StartLocalProxy", (GroupId, groupId));
std::unique_ptr<IActor> proxy;
+ TActorSystem *as = TActivationContext::ActorSystem();
+
+ TGroupRecord& group = Groups[groupId];
+
+ auto getCounters = [&](const TIntrusivePtr<TBlobStorageGroupInfo>& info) {
+ return DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType());
+ };
if (EnableProxyMock) {
// create mock proxy
proxy.reset(CreateBlobStorageGroupProxyMockActor());
} else if (auto info = NeedGroupInfo(groupId)) {
- // create proxy with configuration
- auto counters = DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType());
- proxy.reset(CreateBlobStorageGroupProxyConfigured(std::move(info), false, DsProxyNodeMon, std::move(counters),
- EnablePutBatching, EnableVPatch));
+ if (info->BlobDepotId) {
+ TActorId proxyActorId;
+
+ switch (info->DecommitStatus) {
+ case NKikimrBlobStorage::TGroupDecommitStatus::NONE:
+ case NKikimrBlobStorage::TGroupDecommitStatus::PENDING:
+ Y_FAIL("unexpected DecommitStatus for dynamic group with bound BlobDepotId");
+
+ case NKikimrBlobStorage::TGroupDecommitStatus::IN_PROGRESS:
+ // create proxy that will be used by blob depot agent to fetch underlying data
+ proxyActorId = as->Register(CreateBlobStorageGroupProxyConfigured(
+ TIntrusivePtr<TBlobStorageGroupInfo>(info), false, DsProxyNodeMon,
+ getCounters(info), EnablePutBatching, EnableVPatch), TMailboxType::ReadAsFilled,
+ AppData()->SystemPoolId);
+ [[fallthrough]];
+ case NKikimrBlobStorage::TGroupDecommitStatus::DONE:
+ proxy.reset(NBlobDepot::CreateBlobDepotAgent(groupId, info, proxyActorId));
+ break;
+ }
+ } else {
+ // create proxy with configuration
+ proxy.reset(CreateBlobStorageGroupProxyConfigured(std::move(info), false, DsProxyNodeMon, getCounters(info),
+ EnablePutBatching, EnableVPatch));
+ }
} else {
// create proxy without configuration
proxy.reset(CreateBlobStorageGroupProxyUnconfigured(groupId, DsProxyNodeMon, EnablePutBatching, EnableVPatch));
}
- TActorSystem *as = TActivationContext::ActorSystem();
- as->RegisterLocalService(MakeBlobStorageProxyID(groupId), as->Register(proxy.release(), TMailboxType::ReadAsFilled,
- AppData()->SystemPoolId));
+ group.ProxyId = as->Register(proxy.release(), TMailboxType::ReadAsFilled, AppData()->SystemPoolId);
+ as->RegisterLocalService(MakeBlobStorageProxyID(groupId), group.ProxyId);
}
void TNodeWarden::StartVirtualGroupAgent(ui32 groupId) {
STLOG(PRI_DEBUG, BS_NODE, NW40, "StartVirtualGroupProxy", (GroupId, groupId));
TActorSystem *as = TActivationContext::ActorSystem();
- const TActorId actorId = as->Register(NBlobDepot::CreateBlobDepotAgent(groupId), TMailboxType::ReadAsFilled,
- AppData()->SystemPoolId);
- if (auto info = NeedGroupInfo(groupId)) {
- auto counters = DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType());
- Send(actorId, new TEvBlobStorage::TEvConfigureProxy(std::move(info), std::move(counters)));
- }
- as->RegisterLocalService(MakeBlobStorageProxyID(groupId), actorId);
+ TGroupRecord& group = Groups[groupId];
+ auto info = NeedGroupInfo(groupId);
+ group.ProxyId = as->Register(NBlobDepot::CreateBlobDepotAgent(groupId, std::move(info), {}),
+ TMailboxType::ReadAsFilled, AppData()->SystemPoolId);
+ group.AgentProxy = true;
+ as->RegisterLocalService(MakeBlobStorageProxyID(groupId), group.ProxyId);
}
void TNodeWarden::StartStaticProxies() {
@@ -64,8 +89,7 @@ void TNodeWarden::HandleForwarded(TAutoPtr<::NActors::IEventHandle> &ev) {
TActivationContext::Send(ev->Forward(errorProxy));
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, errorProxy, {}, nullptr, 0));
return;
- } else if (TGroupRecord& group = Groups[id]; !group.ProxyRunning) {
- group.ProxyRunning = true;
+ } else if (TGroupRecord& group = Groups[id]; !group.ProxyId) {
if (TGroupID(id).ConfigurationType() == EGroupConfigurationType::Virtual) {
StartVirtualGroupAgent(id);
} else {
@@ -79,7 +103,7 @@ void TNodeWarden::HandleForwarded(TAutoPtr<::NActors::IEventHandle> &ev) {
void TNodeWarden::Handle(NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate::TPtr ev) {
const auto& record = ev->Get()->Record;
const ui32 groupId = record.GetGroupID();
- if (const auto it = Groups.find(groupId); it != Groups.end() && it->second.ProxyRunning) {
+ if (const auto it = Groups.find(groupId); it != Groups.end() && it->second.ProxyId) {
TActivationContext::Send(ev->Forward(WhiteboardId));
}
}
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 7cf3743c679..ac923ff0dfb 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -47,6 +47,12 @@ message TGivenIdRange {
repeated TChannelRange ChannelRanges = 1;
}
+message TResolvedValueChain {
+ optional uint32 GroupId = 1;
+ optional NKikimrProto.TLogoBlobID BlobId = 2;
+ optional uint32 SubrangeBegin = 3;
+ optional uint32 SubrangeEnd = 4;
+}
@@ -190,7 +196,7 @@ message TEvResolveResult {
message TResolvedKey {
optional uint64 Cookie = 1;
optional bytes Key = 2;
- repeated TValueChain ValueChain = 3;
+ repeated TResolvedValueChain ValueChain = 3;
optional bytes Meta = 4;
repeated uint64 Owners = 5;
}