diff options
author | alexvru <[email protected]> | 2022-08-08 14:13:50 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2022-08-08 14:13:50 +0300 |
commit | 34d70b4d5019c91fed33d3aab2cdda46b61dd734 (patch) | |
tree | 9f25bdec505dcceca0ce9ca194501662b9eebd86 | |
parent | 203ba5ba0339b44f13dea1b1d03058a541e3d6e7 (diff) |
Support group decommission machinery
40 files changed, 286 insertions, 155 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index cc731030d48..449b76fb457 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -13,6 +13,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()), (PipeServerId, ev->Get()->ServerId)); + const auto it = PipeServerToNode.find(ev->Get()->ServerId); Y_VERIFY(it != PipeServerToNode.end()); if (const auto& nodeId = it->second) { @@ -27,6 +28,8 @@ namespace NKikimr::NBlobDepot { } } PipeServerToNode.erase(it); + + RegisterAgentQ.erase(ev->Get()->ServerId); } void TBlobDepot::OnAgentDisconnect(TAgent& agent) { @@ -35,9 +38,16 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { + if (!Configured || (Config.HasDecommitGroupId() && !DecommitBlocksFinished)) { + const auto [it, inserted] = RegisterAgentQ.emplace(ev->Recipient, ev.Release()); + Y_VERIFY(inserted); + return; + } + const ui32 nodeId = ev->Sender.NodeId(); const TActorId& pipeServerId = ev->Recipient; const auto& req = ev->Get()->Record; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, req), (NodeId, nodeId), (PipeServerId, pipeServerId), (Id, ev->Cookie)); @@ -267,4 +277,15 @@ namespace NKikimr::NBlobDepot { Execute(std::make_unique<TTxInvokeCallback>(this, ev)); } + void TBlobDepot::ProcessRegisterAgentQ() { + if (!Configured || (Config.HasDecommitGroupId() && !DecommitBlocksFinished)) { + return; + } + + for (auto& [pipeServerId, ev] : std::exchange(RegisterAgentQ, {})) { + TAutoPtr<IEventHandle> tmp(ev.release()); + Receive(tmp, TActivationContext::AsActorContext()); + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index c4f56e58a02..b16abfc0dc9 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -5,24 +5,35 @@ namespace NKikimr::NBlobDepot { - TBlobDepotAgent::TBlobDepotAgent(ui32 virtualGroupId) - : TActor(&TThis::StateFunc) - , TRequestSender(*this) + TBlobDepotAgent::TBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) + : TRequestSender(*this) , VirtualGroupId(virtualGroupId) + , ProxyId(proxyId) , AgentInstanceId(RandomNumber<ui64>()) , BlocksManagerPtr(new TBlocksManager(*this)) , BlocksManager(*BlocksManagerPtr) , BlobMappingCachePtr(new TBlobMappingCache(*this)) , BlobMappingCache(*BlobMappingCachePtr) { - Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual); + if (info) { + Y_VERIFY(info->BlobDepotId); + TabletId = *info->BlobDepotId; + } } TBlobDepotAgent::~TBlobDepotAgent() {} - IActor *CreateBlobDepotAgent(ui32 virtualGroupId) { - return new TBlobDepotAgent(virtualGroupId); + void TBlobDepotAgent::Bootstrap() { + Become(&TThis::StateFunc); + + if (TabletId) { + ConnectToBlobDepot(); + } + } + + IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) { + return new TBlobDepotAgent(virtualGroupId, std::move(info), proxyId); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent.h b/ydb/core/blob_depot/agent/agent.h index 4d66f9967b4..3eb49f8f5bb 100644 --- a/ydb/core/blob_depot/agent/agent.h +++ b/ydb/core/blob_depot/agent/agent.h @@ -4,6 +4,6 @@ namespace NKikimr::NBlobDepot { - IActor *CreateBlobDepotAgent(ui32 virtualGroupId); + IActor *CreateBlobDepotAgent(ui32 groupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId); } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 28fc2b71179..25e1b937020 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -30,10 +30,8 @@ namespace NKikimr::NBlobDepot { using TPtr = std::shared_ptr<TRequestContext>; }; - using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; - struct TTabletDisconnected {}; - struct TKeyResolved { const TValueChain* ValueChain; }; + struct TKeyResolved { const TResolvedValueChain* ValueChain; }; class TRequestSender { THashMap<ui64, TRequestContext::TPtr> RequestsInFlight; @@ -74,18 +72,21 @@ namespace NKikimr::NBlobDepot { }; class TBlobDepotAgent - : public TActor<TBlobDepotAgent> + : public TActorBootstrapped<TBlobDepotAgent> , public TRequestSender { const ui32 VirtualGroupId; + const TActorId ProxyId; const ui64 AgentInstanceId; ui64 TabletId = Max<ui64>(); TActorId PipeId; public: - TBlobDepotAgent(ui32 virtualGroupId); + TBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId); ~TBlobDepotAgent(); + void Bootstrap(); + #define FORWARD_STORAGE_PROXY(TYPE) fFunc(TEvBlobStorage::TYPE, HandleStorageProxy); STRICT_STFUNC(StateFunc, cFunc(TEvents::TSystem::Poison, PassAway); @@ -108,6 +109,7 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobStorage::TEvPutResult, HandleOtherResponse); ENUMERATE_INCOMING_EVENTS(FORWARD_STORAGE_PROXY) + hFunc(TEvBlobStorage::TEvBunchOfEvents, Handle); ); #undef FORWARD_STORAGE_PROXY @@ -120,14 +122,18 @@ namespace NKikimr::NBlobDepot { const auto& info = ev->Get()->Info; Y_VERIFY(info); Y_VERIFY(info->BlobDepotId); - TabletId = *info->BlobDepotId; - if (TabletId) { - ConnectToBlobDepot(); - } + if (TabletId != *info->BlobDepotId) { + TabletId = *info->BlobDepotId; + if (TabletId) { + ConnectToBlobDepot(); + } - for (auto& ev : std::exchange(PendingEventQ, {})) { - TActivationContext::Send(ev.release()); + for (auto& ev : std::exchange(PendingEventQ, {})) { + TActivationContext::Send(ev.release()); + } } + + TActivationContext::Send(ev->Forward(ProxyId)); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -168,6 +174,7 @@ namespace NKikimr::NBlobDepot { }; ui32 BlobDepotGeneration = 0; + std::optional<ui32> DecommitGroupId; void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); @@ -233,6 +240,7 @@ namespace NKikimr::NBlobDepot { TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> ExecutingQueries; void HandleStorageProxy(TAutoPtr<IEventHandle> ev); + void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); TQuery *CreateQuery(TAutoPtr<IEventHandle> ev); template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev); @@ -293,8 +301,8 @@ namespace NKikimr::NBlobDepot { struct TReadContext; - bool IssueRead(const TValueChain& values, ui64 offset, ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, - bool mustRestoreFirst, TQuery *query, ui64 tag, bool vg, TString *error); + bool IssueRead(const TResolvedValueChain& values, ui64 offset, ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, + bool mustRestoreFirst, TQuery *query, ui64 tag, TString *error); void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg); diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index 479cb057bdd..71b1d822d82 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -21,7 +21,8 @@ namespace NKikimr::NBlobDepot { } } - const TValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context) { + const TResolvedValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, + TRequestContext::TPtr context) { const auto [it, inserted] = Cache.try_emplace(std::move(key)); auto& entry = it->second; if (inserted) { diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h index d4e44e7c74f..42d124b2bd0 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.h +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h @@ -20,7 +20,7 @@ namespace NKikimr::NBlobDepot { struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> { TStringBuf Key; - std::optional<TValueChain> Values; + std::optional<TResolvedValueChain> Values; bool ResolveInFlight = false; std::list<TQueryWaitingForKey> QueriesWaitingForKey; }; @@ -34,7 +34,7 @@ namespace NKikimr::NBlobDepot { {} void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg); - const TValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context); + const TResolvedValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context); void ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response); }; diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 0028a3690fc..91ccc11fd66 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -29,6 +29,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), (Msg, msg)); BlobDepotGeneration = msg.GetGeneration(); + DecommitGroupId = msg.HasDecommitGroupId() ? std::make_optional(msg.GetDecommitGroupId()) : std::nullopt; THashSet<NKikimrBlobDepot::TChannelKind::E> vanishedKinds; for (const auto& [kind, _] : ChannelKinds) { diff --git a/ydb/core/blob_depot/agent/proxy.cpp b/ydb/core/blob_depot/agent/proxy.cpp index c4e9e8b113b..e1eb120c82c 100644 --- a/ydb/core/blob_depot/agent/proxy.cpp +++ b/ydb/core/blob_depot/agent/proxy.cpp @@ -5,7 +5,11 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::SendToProxy(ui32 groupId, std::unique_ptr<IEventBase> event, TRequestSender *sender, TRequestContext::TPtr context) { const ui64 id = NextRequestId++; - SendToBSProxy(SelfId(), groupId, event.release(), id); + if (groupId == DecommitGroupId) { + Send(ProxyId, event.release(), 0, id); + } else { + SendToBSProxy(SelfId(), groupId, event.release(), id); + } RegisterRequest(id, sender, std::move(context), {}, false); } diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index cf6587c1f77..6870cae93f2 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -18,6 +18,10 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepotAgent::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) { + ev->Get()->Process(this); + } + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(TAutoPtr<IEventHandle> ev) { switch (ev->GetTypeRewrite()) { #define XX(TYPE) \ diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 1d61329ae3d..6849e254f40 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -26,9 +26,9 @@ namespace NKikimr::NBlobDepot { } }; - bool TBlobDepotAgent::IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>& values, ui64 offset, - ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, TQuery *query, - ui64 tag, bool vg, TString *error) { + bool TBlobDepotAgent::IssueRead(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>& values, + ui64 offset, ui64 size, NKikimrBlobStorage::EGetHandleClass getHandleClass, bool mustRestoreFirst, + TQuery *query, ui64 tag, TString *error) { ui64 outputOffset = 0; struct TReadItem { @@ -41,19 +41,12 @@ namespace NKikimr::NBlobDepot { std::vector<TReadItem> items; for (const auto& value : values) { - if (!value.HasLocator()) { - *error = "TValueChain.Locator is missing"; - return false; - } - const auto& locator = value.GetLocator(); - const ui64 totalDataLen = locator.GetTotalDataLen(); - if (!totalDataLen) { - *error = "TBlobLocator.TotalDataLen is missing or zero"; - return false; - } + const ui32 groupId = value.GetGroupId(); + const auto blobId = LogoBlobIDFromLogoBlobID(value.GetBlobId()); const ui64 begin = value.GetSubrangeBegin(); - const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : totalDataLen; - if (end <= begin || totalDataLen < end) { + const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : blobId.BlobSize(); + + if (end <= begin || blobId.BlobSize() < end) { *error = "incorrect SubrangeBegin/SubrangeEnd pair"; return false; } @@ -70,18 +63,7 @@ namespace NKikimr::NBlobDepot { partLen = Min(size ? size : Max<ui64>(), partLen - offset); Y_VERIFY(partLen); - auto blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); - - if (vg) { - const bool composite = totalDataLen + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize; - const EBlobType type = composite ? EBlobType::VG_COMPOSITE_BLOB : EBlobType::VG_DATA_BLOB; - const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0); - const auto id = blobSeqId.MakeBlobId(TabletId, type, 0, blobSize); - items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin), - static_cast<ui32>(partLen), outputOffset}); - } else { - Y_FAIL(); - } + items.push_back(TReadItem{groupId, blobId, ui32(offset + begin), ui32(partLen), outputOffset}); outputOffset += partLen; offset = 0; diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index c50a8a57758..51cbaa05c9b 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -108,7 +108,7 @@ namespace NKikimr::NBlobDepot { if (ReadBody) { TString error; if (!Agent.IssueRead(item.GetValueChain(), 0, 0, NKikimrBlobStorage::Discover, true, this, 0, - true, &error)) { + &error)) { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " << error); } diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index f710bbfe0f7..f7ff27234d3 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -36,7 +36,7 @@ namespace NKikimr::NBlobDepot { response.RequestedSize = query.Size; TString blobId = query.Id.AsBinaryString(); - if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, + if (const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, std::make_shared<TResolveKeyContext>(i))) { if (!ProcessSingleResult(i, value)) { return; @@ -45,7 +45,7 @@ namespace NKikimr::NBlobDepot { } } - bool ProcessSingleResult(ui32 queryIdx, const TValueChain *value) { + bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value) { auto& msg = GetQuery(); if (!value) { @@ -57,7 +57,7 @@ namespace NKikimr::NBlobDepot { } else if (value) { TString error; const bool success = Agent.IssueRead(*value, msg.Queries[queryIdx].Shift, msg.Queries[queryIdx].Size, - msg.GetHandleClass, msg.MustRestoreFirst, this, queryIdx, true, &error); + msg.GetHandleClass, msg.MustRestoreFirst, this, queryIdx, &error); if (!success) { EndWithError(NKikimrProto::ERROR, std::move(error)); return false; diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index af054706490..670960401ea 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -97,7 +97,7 @@ namespace NKikimr::NBlobDepot { if (!query.IsIndexOnly) { TString error; if (!Agent.IssueRead(key.GetValueChain(), 0, 0, NKikimrBlobStorage::EGetHandleClass::FastRead, - query.MustRestoreFirst, this, index, true, &error)) { + query.MustRestoreFirst, this, index, &error)) { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " << error); } diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index c871995c407..2353d042cd5 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -1,5 +1,7 @@ #include "blob_depot_tablet.h" #include "assimilator_fetch_machine.h" +#include "schema.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { @@ -77,7 +79,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "TGroupAssimilator::TEvControllerNodeServiceSetUpdate", (GroupId, GroupId), - (Msg, ev->Get()->ToString())); + (Msg, ev->Get()->Record)); NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId); auto& record = ev->Get()->Record; @@ -152,7 +154,40 @@ namespace NKikimr::NBlobDepot { } } - void TBlobDepot::Handle(TEvAssimilatedData::TPtr /*ev*/) { + void TBlobDepot::Handle(TEvAssimilatedData::TPtr ev) { + class TTxPutAssimilatedData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::unique_ptr<TEvAssimilatedData> Ev; + + public: + TTxPutAssimilatedData(TBlobDepot *self, TEvAssimilatedData::TPtr ev) + : TTransactionBase(self) + , Ev(ev->Release().Release()) + {} + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + for (const auto& block : Ev->Blocks) { + Self->BlocksManager->AddBlockOnLoad(block.TabletId, block.BlockedGeneration, 0); + db.Table<Schema::Blocks>().Key(block.TabletId).Update( + NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(block.BlockedGeneration), + NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(0) + ); + } + + return true; + } + + void Complete(const TActorContext&) override { + if (!std::exchange(Self->DecommitBlocksFinished, Ev->BlocksFinished)) { + STLOG(PRI_INFO, BLOB_DEPOT, BDTxx, "blocks assimilation complete", (TabletId, Self->TabletID()), + (DecommitGroupId, Self->Config.GetDecommitGroupId())); + Self->ProcessRegisterAgentQ(); + } + } + }; + + Execute(std::make_unique<TTxPutAssimilatedData>(this, ev)); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 10586eb63b8..3e6e66f77ec 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -153,6 +153,9 @@ namespace NKikimr::NBlobDepot { static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1); + // when in decommission mode and not all blocks are yet recovered, then we postpone agent registration + THashMap<TActorId, std::unique_ptr<TEvBlobDepot::TEvRegisterAgent::THandle>> RegisterAgentQ; + struct TAgent { std::optional<TActorId> PipeServerId; std::optional<TActorId> AgentId; @@ -198,6 +201,8 @@ namespace NKikimr::NBlobDepot { void ResetAgent(TAgent& agent); void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev); + void ProcessRegisterAgentQ(); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// void Enqueue(TAutoPtr<IEventHandle>& ev, const TActorContext&) override { @@ -220,6 +225,7 @@ namespace NKikimr::NBlobDepot { void StartOperation() { InitChannelKinds(); StartGroupAssimilator(); + ProcessRegisterAgentQ(); } void OnDetach(const TActorContext&) override { @@ -272,6 +278,7 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Configuration + bool Configured = false; NKikimrBlobDepot::TBlobDepotConfig Config; void Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev); @@ -309,6 +316,7 @@ namespace NKikimr::NBlobDepot { // Group assimilation TActorId RunningGroupAssimilator; + bool DecommitBlocksFinished = false; class TGroupAssimilator; class TGroupAssimilatorFetchMachine; diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 06e335af3af..96ce5a4971d 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -42,7 +42,7 @@ namespace NKikimr::NBlobDepot { void TData::PutKey(TKey key, TValue&& data) { ui64 referencedBytes = 0; - EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) { + EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) { if (!RefCount[id]++) { // first mention of this id auto& record = GetRecordsPerChannelGroup(id); @@ -78,7 +78,7 @@ namespace NKikimr::NBlobDepot { const auto it = Data.find(key); Y_VERIFY(it != Data.end()); TValue& value = it->second; - EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id) { + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](TLogoBlobID id, ui32 /*begin*/, ui32 /*end*/) { const auto it = RefCount.find(id); Y_VERIFY(it != RefCount.end()); if (!--it->second) { diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index bdf96bb06b8..5f106549a32 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -221,7 +221,19 @@ namespace NKikimr::NBlobDepot { item.SetCookie(*cookie); } item.SetKey(key.MakeBinaryKey()); - item.MutableValueChain()->CopyFrom(value.ValueChain); + EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) { + if (begin != end) { + auto *out = item.AddValueChain(); + out->SetGroupId(Self->Info()->GroupFor(id.Channel(), id.Generation())); + LogoBlobIDFromLogoBlobID(id, out->MutableBlobId()); + if (begin) { + out->SetSubrangeBegin(begin); + } + if (end != id.BlobSize()) { + out->SetSubrangeEnd(end); + } + } + }); if (value.Meta) { item.SetMeta(value.Meta.data(), value.Meta.size()); } diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp index 7c6f1353ac2..ffdd206091e 100644 --- a/ydb/core/blob_depot/op_apply_config.cpp +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -9,7 +9,6 @@ namespace NKikimr::NBlobDepot { class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<IEventHandle> Response; TString ConfigProtobuf; - bool WasConfigured = false; public: TTxApplyConfig(TBlobDepot *self, TEvBlobDepot::TEvApplyConfig& ev, std::unique_ptr<IEventHandle> response, @@ -33,7 +32,6 @@ namespace NKikimr::NBlobDepot { if (!table.IsReady()) { return false; } - WasConfigured = table.IsValid() && table.HaveValue<Schema::Config::ConfigProtobuf>(); db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( NIceDb::TUpdate<Schema::Config::ConfigProtobuf>(ConfigProtobuf) @@ -46,12 +44,12 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT17, "TTxApplyConfig::Complete", (TabletId, Self->TabletID()), - (WasConfigured, WasConfigured)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT17, "TTxApplyConfig::Complete", (TabletId, Self->TabletID())); - if (!WasConfigured) { + if (!std::exchange(Self->Configured, true)) { Self->StartOperation(); } + TActivationContext::Send(Response.release()); } }; diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index d3947b5ab91..7812213678a 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -8,8 +8,6 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::ExecuteTxLoad() { class TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - bool Configured = false; - public: TTxLoad(TBlobDepot *self) : TTransactionBase(self) @@ -31,8 +29,8 @@ namespace NKikimr::NBlobDepot { return false; } else if (table.IsValid()) { if (table.HaveValue<Schema::Config::ConfigProtobuf>()) { - Configured = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>()); - Y_VERIFY(Configured); + Self->Configured = Self->Config.ParseFromString(table.GetValue<Schema::Config::ConfigProtobuf>()); + Y_VERIFY(Self->Configured); } } } @@ -87,9 +85,9 @@ namespace NKikimr::NBlobDepot { void Complete(const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "TTxLoad::Complete", (TabletId, Self->TabletID()), - (Configured, Configured)); + (Configured, Self->Configured)); - if (Configured) { + if (Self->Configured) { Self->StartOperation(); } diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 7e6fd06c1b8..fcb16cebec0 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -158,6 +158,7 @@ namespace NKikimr::NBlobDepot { }; using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; + using TResolvedValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>; template<typename TCallback> void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) { @@ -165,11 +166,11 @@ namespace NKikimr::NBlobDepot { const auto& locator = item.GetLocator(); const auto& blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); if (locator.GetTotalDataLen() + locator.GetFooterLen() > MaxBlobSize) { - callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen())); - callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen())); + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_DATA_BLOB, 0, locator.GetTotalDataLen()), 0, locator.GetTotalDataLen()); + callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_FOOTER_BLOB, 0, locator.GetFooterLen()), 0, 0); } else { callback(blobSeqId.MakeBlobId(tabletId, EBlobType::VG_COMPOSITE_BLOB, 0, locator.GetTotalDataLen() + - locator.GetFooterLen())); + locator.GetFooterLen()), 0, locator.GetTotalDataLen()); } } } diff --git a/ydb/core/blobstorage/base/blobstorage_events.h b/ydb/core/blobstorage/base/blobstorage_events.h index f2f6a6df620..bd36342fbca 100644 --- a/ydb/core/blobstorage/base/blobstorage_events.h +++ b/ydb/core/blobstorage/base/blobstorage_events.h @@ -417,6 +417,14 @@ namespace NKikimr { struct TEvBlobStorage::TEvBunchOfEvents : TEventLocal<TEvBunchOfEvents, EvBunchOfEvents> { std::vector<std::unique_ptr<IEventHandle>> Bunch; + + void Process(IActor *actor) { + const TActorContext& ctx = TActivationContext::AsActorContext(); + for (auto& ev : Bunch) { + TAutoPtr<IEventHandle> handle(ev.release()); + actor->Receive(handle, ctx); + } + } }; struct TEvBlobStorage::TEvAskRestartPDisk : TEventLocal<TEvAskRestartPDisk, EvAskRestartPDisk> { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 78e7039aa67..5ceac59dec0 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -132,7 +132,7 @@ NActors::NLog::EPriority PriorityForStatusResult(NKikimrProto::EReplyStatus stat NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus status); template<typename TDerived> -class TBlobStorageGroupRequestActor : public TActorBootstrapped<TDerived> { +class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor<TDerived>> { public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BS_GROUP_REQUEST; @@ -142,7 +142,8 @@ public: TIntrusivePtr<TBlobStorageGroupProxyMon> mon, const TActorId& source, ui64 cookie, NWilson::TTraceId traceId, NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled, TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name) - : Info(std::move(info)) + : TActor<TBlobStorageGroupRequestActor<TDerived>>(&TThis::InitialStateFunc, Derived().ActorActivityType()) + , Info(std::move(info)) , GroupQueues(std::move(groupQueues)) , Mon(std::move(mon)) , PoolCounters(storagePoolCounters) @@ -161,6 +162,17 @@ public: .Attribute("RestartCounter", RestartCounter); } + void Registered(TActorSystem *as, const TActorId& parentId) override { + ProxyActorId = parentId; + as->Send(new IEventHandle(TEvents::TSystem::Bootstrap, 0, + TActor<TBlobStorageGroupRequestActor<TDerived>>::SelfId(), parentId, nullptr, 0)); + TActor<TBlobStorageGroupRequestActor<TDerived>>::Registered(as, parentId); + } + + STRICT_STFUNC(InitialStateFunc, + cFunc(TEvents::TSystem::Bootstrap, Derived().Bootstrap); + ) + template<typename T> void CountEvent(const T &ev) const { ERequestType request = TDerived::RequestType(); @@ -258,10 +270,10 @@ public: } // make NodeWarden restart the query just after proxy reconfiguration - const TActorId& proxyId = GetProxyActorId(); Y_VERIFY_DEBUG(RestartCounter < 100); auto q = self.RestartQuery(RestartCounter + 1); ++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)]; + const TActorId& proxyId = MakeBlobStorageProxyID(Info->GroupID); TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span.GetTraceId())); PassAway(); return true; @@ -386,6 +398,10 @@ public: } } + void SendToProxy(std::unique_ptr<IEventBase> event, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { + Derived().Send(ProxyActorId, event.release(), 0, cookie, std::move(traceId)); + } + void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie) { SendResponse(std::move(ev), timeStats, source, cookie); @@ -396,15 +412,11 @@ public: SendResponseAndDie(std::move(ev), timeStats, Source, Cookie); } - TActorId GetProxyActorId() const { - return MakeBlobStorageProxyID(Info->GroupID); - } - void PassAway() override { // ensure that we are dying for the first time Y_VERIFY(!std::exchange(Dead, true)); TDerived::ActiveCounter(Mon)->Dec(); - Derived().Send(GetProxyActorId(), new TEvDeathNote(Responsiveness)); + SendToProxy(std::make_unique<TEvDeathNote>(Responsiveness)); TActorBootstrapped<TDerived>::PassAway(); } @@ -445,13 +457,12 @@ public: RequestBytes, GeneratedSubrequests, GeneratedSubrequestBytes, Timer.Passed()); } - const TActorId proxyId = GetProxyActorId(); if (timeStats) { - Derived().Send(proxyId, new TEvTimeStats(std::move(*timeStats))); + SendToProxy(std::make_unique<TEvTimeStats>(std::move(*timeStats))); } if (LatencyQueueKind) { - Derived().Send(proxyId, new TEvLatencyReport(*LatencyQueueKind, now - RequestStartTime)); + SendToProxy(std::make_unique<TEvLatencyReport>(*LatencyQueueKind, now - RequestStartTime)); } // KIKIMR-6737 @@ -501,6 +512,8 @@ private: } protected: + using TThis = TDerived; + TIntrusivePtr<TBlobStorageGroupInfo> Info; TIntrusivePtr<TGroupQueues> GroupQueues; TIntrusivePtr<TBlobStorageGroupProxyMon> Mon; @@ -527,6 +540,7 @@ private: THPTimer Timer; std::deque<std::unique_ptr<IEventHandle>> PostponedQ; TBlobStorageGroupInfo::TGroupFailDomains RacingDomains; // a set of domains we've received RACE from + TActorId ProxyActorId; }; void Encrypt(char *destination, const char *source, size_t shift, size_t sizeBytes, const TLogoBlobID &id, @@ -568,7 +582,7 @@ IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroup const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvPatch *ev, ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, - const TActorId &proxyId, bool useVPatch); + bool useVPatch); IActor* CreateBlobStorageGroupMultiGetRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp index 16a4730e535..3cded7156d5 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp @@ -602,8 +602,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB getRequest->IsInternal = true; getRequest->TabletId = TabletId; getRequest->AcquireBlockedGeneration = true; - bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span.GetTraceId()); - Y_VERIFY(isSent); + SendToProxy(std::move(getRequest), 0, Span.GetTraceId()); TotalSent++; A_LOG_DEBUG_S("BSD10", "Sent EvGet logoBlobId# " << logoBlobId.ToString()); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp index 590a92bdf0f..7aa610589a0 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp @@ -570,7 +570,7 @@ public: A_LOG_DEBUG_S("DSPDM17", "sending TEvGet# " << query->ToString()); - SendToBSProxy(SelfId(), Info->GroupID, query.release()); + SendToProxy(std::move(query)); ++RequestsInFlight; } else { GetFinished = true; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp index 5fdc6c30d90..9ab9a472051 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp @@ -286,7 +286,7 @@ public: // we have to process this blob auto query = std::make_unique<TEvBlobStorage::TEvGet>(id, 0, 0, Deadline, HandleClass, true, !ReadBody, ForceBlockedGeneration); query->IsInternal = true; - SendToBSProxy(SelfId(), Info->GroupID, query.release()); + SendToProxy(std::move(query)); GetIssuedFor = id; return; } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index fd4b5e5b5e4..e03405a42c3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -258,15 +258,6 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); void Handle(TEvDeathNote::TPtr ev); - template<typename TEvent> - void HandleCheckAssimilator(TAutoPtr<TEventHandle<TEvent>>& ev) { - if (/*Info->AssimilatorGroupId*/ false) { - //TActivationContext::Send(ev->Forward(MakeBlobStorageProxyID(*Info->AssimilatorGroupId))); - } else { - return HandleNormal(ev); - } - } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Error state @@ -405,7 +396,7 @@ public: STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { - HANDLE_EVENTS(HandleCheckAssimilator); + HANDLE_EVENTS(HandleNormal); default: return StateCommon(ev, ctx); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp index 1b4868cf265..f3be4f5dd48 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp @@ -168,7 +168,7 @@ class TBlobStorageGroupIndexRestoreGetRequest << " recoverable blob, id# " << Queries[idx].Id.ToString() << " BlobStatus# " << DumpBlobStatus(idx) << " sending EvGet"); - SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0); + SendToProxy(std::move(get)); RestoreQueriesStarted++; } else if (blobState == TBlobStorageGroupInfo::EBS_FULL) { a.Status = NKikimrProto::OK; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp index fa59aff5c1e..d338291c6f9 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp @@ -155,7 +155,7 @@ public: R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx << " isLast# " << isLast << " ev# " << ev->ToString()); - SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId()); + SendToProxy(std::move(ev), cookie, Span.GetTraceId()); if (isLast) { CollectRequestsInFlight++; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp index 7b73f153a35..9b2f53f4c09 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp @@ -122,7 +122,7 @@ public: void SendRequests() { for (; RequestsInFlight < MaxRequestsInFlight && !PendingGets.empty(); ++RequestsInFlight, PendingGets.pop_front()) { auto& [ev, cookie] = PendingGets.front(); - SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId()); + SendToProxy(std::move(ev), cookie, Span.GetTraceId()); } if (!RequestsInFlight && PendingGets.empty()) { auto ev = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, 0, Info->GroupID); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index 9dbba7dd853..d0c5842904a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -33,7 +33,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob static constexpr ui32 TypicalMaxPartsCount = TypicalPartPlacementCount * TypicalPartsInBlob; TString Buffer; - TActorId ProxyActorId; ui32 OriginalGroupId; TLogoBlobID OriginalId; @@ -90,11 +89,10 @@ public: const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvPatch *ev, ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, - const TActorId &proxyId, bool useVPatch = false) + bool useVPatch = false) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_PATCH, false, {}, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Patch") - , ProxyActorId(proxyId) , OriginalGroupId(ev->OriginalGroupId) , OriginalId(ev->OriginalId) , PatchedId(ev->PatchedId) @@ -172,7 +170,7 @@ public: std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline, NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault); put->Orbit = std::move(Orbit); - Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span.GetTraceId()); + SendToProxy(std::move(put), OriginalId.Hash(), Span.GetTraceId()); } void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev) { @@ -528,7 +526,7 @@ public: NKikimrBlobStorage::AsyncRead); get->Orbit = std::move(Orbit); if (OriginalGroupId == Info->GroupID) { - Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span.GetTraceId()); + SendToProxy(std::move(get), PatchedId.Hash(), Span.GetTraceId()); } else { SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span.GetTraceId()); } @@ -824,9 +822,9 @@ IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroup const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvPatch *ev, ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, - const TActorId &proxyId, bool useVPatch) { - return new TBlobStorageGroupPatchRequest(info, state, source, mon, ev, cookie, std::move(traceId), - now, storagePoolCounters, proxyId, useVPatch); + bool useVPatch) { + return new TBlobStorageGroupPatchRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, + storagePoolCounters, useVPatch); } }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index c83f46b1080..a0e20065d1f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -250,7 +250,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob A_LOG_DEBUG_S("DSR08", "sending TEvGet# " << get->ToString()); - SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span.GetTraceId()); + SendToProxy(std::move(get), 0, Span.GetTraceId()); // switch state Become(&TThis::StateGet); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 3ef82b2c926..72a6b091049 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -161,7 +161,7 @@ namespace NKikimr { const TActorId reqId = Register( CreateBlobStorageGroupPatchRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), now, - StoragePoolCounters, SelfId(), EnableVPatch.Update(now))); + StoragePoolCounters, EnableVPatch.Update(now))); ActiveRequests.insert(reqId); } @@ -243,11 +243,7 @@ namespace NKikimr { } void TBlobStorageGroupProxy::Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev) { - const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId()); - for (auto& ev : ev->Get()->Bunch) { - TAutoPtr<IEventHandle> handle(ev.release()); - Receive(handle, ctx); - } + ev->Get()->Process(this); } void TBlobStorageGroupProxy::ProcessBatchedPutRequests(TBatchedQueue<TEvBlobStorage::TEvPut::TPtr> &batchedPuts, diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h index 2167005b968..3c532929f8a 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h @@ -127,7 +127,7 @@ struct TDSProxyEnv { std::unique_ptr<IActor> CreatePatchRequestActor(TEvBlobStorage::TEvPatch::TPtr &ev, bool useVPatch = false) { return std::unique_ptr<IActor>(CreateBlobStorageGroupPatchRequest(Info, GroupQueues, ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TInstant::Now(), StoragePoolCounters, - FakeProxyActorId, useVPatch)); + useVPatch)); } }; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp index aaf73779b16..0e96d053298 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp @@ -530,7 +530,7 @@ void RunNaivePatchTest(TTestBasicRuntime &runtime, const TTestArgs &args, ENaive env.Configure(runtime, args.GType, args.CurrentGroupId, 0); TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args); std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, false); - runtime.Register(patchActor.release()); + runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId); ConductNaivePatch(runtime, args, naiveCase); } @@ -539,7 +539,7 @@ void RunMovedPatchTest(TTestBasicRuntime &runtime, const TTestArgs &args, EMoved env.Configure(runtime, args.GType, args.CurrentGroupId, 0); TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args); std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true); - runtime.Register(patchActor.release()); + runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId); ConductMovedPatch(runtime, env, args, movedCase); } @@ -548,7 +548,7 @@ void RunVPatchTest(TTestBasicRuntime &runtime, const TTestArgs &args, EVPatchCas env.Configure(runtime, args.GType, args.CurrentGroupId, 0); TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args); std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true); - runtime.Register(patchActor.release()); + runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId); ConductVPatch(runtime, env, args, vpatchCase); } @@ -742,7 +742,7 @@ void RunFaultToleranceBlock4Plus2(TTestBasicRuntime &runtime, const TTestArgs &a env.Configure(runtime, args.GType, args.CurrentGroupId, 0); TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args); std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true); - runtime.Register(patchActor.release()); + runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId); ConductFaultTolerance(runtime, env, args, GetFaultToleranceCaseForBlock4Plus2(env, args)); } @@ -751,7 +751,7 @@ void RunFaultToleranceMirror3dc(TTestBasicRuntime &runtime, const TTestArgs &arg env.Configure(runtime, args.GType, args.CurrentGroupId, 0); TEvBlobStorage::TEvPatch::TPtr patch = CreatePatch(runtime, env, args); std::unique_ptr<IActor> patchActor = env.CreatePatchRequestActor(patch, true); - runtime.Register(patchActor.release()); + runtime.Register(patchActor.release(), 0, 0, TMailboxType::Simple, 0, env.FakeProxyActorId); ConductFaultTolerance(runtime, env, args, GetFaultToleranceCaseForMirror3dc(env, args)); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp index cd2072cbc74..d3ca8aa7da9 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp @@ -189,8 +189,8 @@ namespace NKikimr::NStorage { // for group/proxy and ask BSC for group info group.Info.Reset(); RequestGroupConfig(groupId, group); - if (group.ProxyRunning) { - Send(MakeBlobStorageProxyID(groupId), new TEvBlobStorage::TEvConfigureProxy(nullptr)); + if (group.ProxyId) { + Send(group.ProxyId, new TEvBlobStorage::TEvConfigureProxy(nullptr)); } } else if (groupChanged) { // group has changed; obtain main encryption key for this group and try to parse group info from the protobuf @@ -202,13 +202,22 @@ namespace NKikimr::NStorage { STLOG(PRI_ERROR, BS_NODE, NW19, "error while parsing group", (GroupId, groupId), (Err, s)); } - if (group.ProxyRunning) { // update configuration for running proxies + if (group.ProxyId) { // update configuration for running proxies auto info = NeedGroupInfo(groupId); auto counters = info ? DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType()) : nullptr; - Send(MakeBlobStorageProxyID(groupId), new TEvBlobStorage::TEvConfigureProxy(std::move(info), - std::move(counters))); + + if (info && info->BlobDepotId && !group.AgentProxy) { + // re-register proxy as an agent + group.AgentProxy = true; + TActorSystem *as = TActivationContext::ActorSystem(); + group.ProxyId = Register(NBlobDepot::CreateBlobDepotAgent(groupId, info, group.ProxyId), + TMailboxType::ReadAsFilled, AppData()->SystemPoolId); + as->RegisterLocalService(MakeBlobStorageProxyID(groupId), group.ProxyId); + } else { + Send(group.ProxyId, new TEvBlobStorage::TEvConfigureProxy(std::move(info), std::move(counters))); + } } if (const auto& info = group.Info) { @@ -238,9 +247,9 @@ namespace NKikimr::NStorage { if (group.GetEntityStatus() == NKikimrBlobStorage::DESTROY) { if (EjectedGroups.insert(groupId).second) { TGroupRecord& group = Groups[groupId]; - STLOG(PRI_DEBUG, BS_NODE, NW99, "destroying group", (GroupId, groupId), (ProxyRunning, group.ProxyRunning)); - if (group.ProxyRunning) { - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, MakeBlobStorageProxyID(groupId), {}, nullptr, 0)); + STLOG(PRI_DEBUG, BS_NODE, NW99, "destroying group", (GroupId, groupId), (ProxyId, group.ProxyId)); + if (group.ProxyId) { + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, group.ProxyId, {}, nullptr, 0)); } if (group.GroupResolver) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, group.GroupResolver, {}, nullptr, 0)); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 9b72c8dfdc1..53d8f837720 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -348,7 +348,8 @@ namespace NKikimr::NStorage { ui32 MaxKnownGeneration = 0; // maximum seen generation std::optional<NKikimrBlobStorage::TGroupInfo> Group; // group info as a protobuf NKikimrBlobStorage::TGroupInfo EncryptionParams; // latest encryption parameters; set only when encryption enabled; overlay in respect to Group - bool ProxyRunning = false; + TActorId ProxyId; // actor id of running DS proxy or agent + bool AgentProxy = false; // was the group started as an BlobDepot agent proxy? bool GetGroupRequestPending = false; // if true, then we are waiting for GetGroup response for this group bool ProposeRequestPending = false; // if true, then we have sent ProposeKey request and waiting for the group TActorId GroupResolver; // resolver actor id diff --git a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp index 691a15f2ee0..8d29bae0dda 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_mon.cpp @@ -58,7 +58,7 @@ void TNodeWarden::RenderJsonGroupInfo(IOutputStream& out, const std::set<ui32>& groupInfo["Status"] = "ejected"; } else { TGroupRecord& group = Groups[groupId]; - groupInfo["Status"] = group.ProxyRunning ? "started" : "stopped"; + groupInfo["Status"] = group.ProxyId ? "started" : "stopped"; if (const auto& info = group.Info) { groupInfo["Generation"] = info->GroupGeneration; @@ -220,7 +220,7 @@ void TNodeWarden::RenderDSProxies(IOutputStream& out) { ui32 numStarted = 0, numEjected = EjectedGroups.size(); for (const auto& [groupId, group] : Groups) { - numStarted += group.ProxyRunning; + numStarted += bool(group.ProxyId); } H3() { out << "Started DSProxies"; } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp b/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp index e618cf2d553..37660519dbf 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_pipe.cpp @@ -63,7 +63,8 @@ void TNodeWarden::SendRegisterNode() { TVector<ui32> startedDynamicGroups, generations; for (const auto& [groupId, group] : Groups) { - if (group.ProxyRunning && TGroupID(groupId).ConfigurationType() == EGroupConfigurationType::Dynamic) { + if (group.ProxyId && TGroupID(groupId).ConfigurationType() == EGroupConfigurationType::Dynamic && + (!group.Info || group.Info->DecommitStatus != NKikimrBlobStorage::TGroupDecommitStatus::DONE)) { startedDynamicGroups.push_back(groupId); generations.push_back(group.Info ? group.Info->GroupGeneration : 0); } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp index faea46d7334..f44a38d52a5 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp @@ -12,36 +12,61 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) { STLOG(PRI_DEBUG, BS_NODE, NW12, "StartLocalProxy", (GroupId, groupId)); std::unique_ptr<IActor> proxy; + TActorSystem *as = TActivationContext::ActorSystem(); + + TGroupRecord& group = Groups[groupId]; + + auto getCounters = [&](const TIntrusivePtr<TBlobStorageGroupInfo>& info) { + return DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType()); + }; if (EnableProxyMock) { // create mock proxy proxy.reset(CreateBlobStorageGroupProxyMockActor()); } else if (auto info = NeedGroupInfo(groupId)) { - // create proxy with configuration - auto counters = DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType()); - proxy.reset(CreateBlobStorageGroupProxyConfigured(std::move(info), false, DsProxyNodeMon, std::move(counters), - EnablePutBatching, EnableVPatch)); + if (info->BlobDepotId) { + TActorId proxyActorId; + + switch (info->DecommitStatus) { + case NKikimrBlobStorage::TGroupDecommitStatus::NONE: + case NKikimrBlobStorage::TGroupDecommitStatus::PENDING: + Y_FAIL("unexpected DecommitStatus for dynamic group with bound BlobDepotId"); + + case NKikimrBlobStorage::TGroupDecommitStatus::IN_PROGRESS: + // create proxy that will be used by blob depot agent to fetch underlying data + proxyActorId = as->Register(CreateBlobStorageGroupProxyConfigured( + TIntrusivePtr<TBlobStorageGroupInfo>(info), false, DsProxyNodeMon, + getCounters(info), EnablePutBatching, EnableVPatch), TMailboxType::ReadAsFilled, + AppData()->SystemPoolId); + [[fallthrough]]; + case NKikimrBlobStorage::TGroupDecommitStatus::DONE: + proxy.reset(NBlobDepot::CreateBlobDepotAgent(groupId, info, proxyActorId)); + break; + } + } else { + // create proxy with configuration + proxy.reset(CreateBlobStorageGroupProxyConfigured(std::move(info), false, DsProxyNodeMon, getCounters(info), + EnablePutBatching, EnableVPatch)); + } } else { // create proxy without configuration proxy.reset(CreateBlobStorageGroupProxyUnconfigured(groupId, DsProxyNodeMon, EnablePutBatching, EnableVPatch)); } - TActorSystem *as = TActivationContext::ActorSystem(); - as->RegisterLocalService(MakeBlobStorageProxyID(groupId), as->Register(proxy.release(), TMailboxType::ReadAsFilled, - AppData()->SystemPoolId)); + group.ProxyId = as->Register(proxy.release(), TMailboxType::ReadAsFilled, AppData()->SystemPoolId); + as->RegisterLocalService(MakeBlobStorageProxyID(groupId), group.ProxyId); } void TNodeWarden::StartVirtualGroupAgent(ui32 groupId) { STLOG(PRI_DEBUG, BS_NODE, NW40, "StartVirtualGroupProxy", (GroupId, groupId)); TActorSystem *as = TActivationContext::ActorSystem(); - const TActorId actorId = as->Register(NBlobDepot::CreateBlobDepotAgent(groupId), TMailboxType::ReadAsFilled, - AppData()->SystemPoolId); - if (auto info = NeedGroupInfo(groupId)) { - auto counters = DsProxyPerPoolCounters->GetPoolCounters(info->GetStoragePoolName(), info->GetDeviceType()); - Send(actorId, new TEvBlobStorage::TEvConfigureProxy(std::move(info), std::move(counters))); - } - as->RegisterLocalService(MakeBlobStorageProxyID(groupId), actorId); + TGroupRecord& group = Groups[groupId]; + auto info = NeedGroupInfo(groupId); + group.ProxyId = as->Register(NBlobDepot::CreateBlobDepotAgent(groupId, std::move(info), {}), + TMailboxType::ReadAsFilled, AppData()->SystemPoolId); + group.AgentProxy = true; + as->RegisterLocalService(MakeBlobStorageProxyID(groupId), group.ProxyId); } void TNodeWarden::StartStaticProxies() { @@ -64,8 +89,7 @@ void TNodeWarden::HandleForwarded(TAutoPtr<::NActors::IEventHandle> &ev) { TActivationContext::Send(ev->Forward(errorProxy)); TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, errorProxy, {}, nullptr, 0)); return; - } else if (TGroupRecord& group = Groups[id]; !group.ProxyRunning) { - group.ProxyRunning = true; + } else if (TGroupRecord& group = Groups[id]; !group.ProxyId) { if (TGroupID(id).ConfigurationType() == EGroupConfigurationType::Virtual) { StartVirtualGroupAgent(id); } else { @@ -79,7 +103,7 @@ void TNodeWarden::HandleForwarded(TAutoPtr<::NActors::IEventHandle> &ev) { void TNodeWarden::Handle(NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate::TPtr ev) { const auto& record = ev->Get()->Record; const ui32 groupId = record.GetGroupID(); - if (const auto it = Groups.find(groupId); it != Groups.end() && it->second.ProxyRunning) { + if (const auto it = Groups.find(groupId); it != Groups.end() && it->second.ProxyId) { TActivationContext::Send(ev->Forward(WhiteboardId)); } } diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 7cf3743c679..ac923ff0dfb 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -47,6 +47,12 @@ message TGivenIdRange { repeated TChannelRange ChannelRanges = 1; } +message TResolvedValueChain { + optional uint32 GroupId = 1; + optional NKikimrProto.TLogoBlobID BlobId = 2; + optional uint32 SubrangeBegin = 3; + optional uint32 SubrangeEnd = 4; +} @@ -190,7 +196,7 @@ message TEvResolveResult { message TResolvedKey { optional uint64 Cookie = 1; optional bytes Key = 2; - repeated TValueChain ValueChain = 3; + repeated TResolvedValueChain ValueChain = 3; optional bytes Meta = 4; repeated uint64 Owners = 5; } |