diff options
author | alexvru <alexvru@ydb.tech> | 2022-08-13 23:14:43 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-08-13 23:14:43 +0300 |
commit | 77b7e132d724f31ae97a0438cc39fa87b39d968b (patch) | |
tree | bda2e13eb7b34828d69218f17855950b02be98bc | |
parent | 167bbfedbd3a21f0da22ca76f6c81772c6acb9ec (diff) | |
download | ydb-77b7e132d724f31ae97a0438cc39fa87b39d968b.tar.gz |
BlobDepot work in progress: support assimilation through ds proxy
50 files changed, 991 insertions, 971 deletions
diff --git a/ydb/core/base/blobstorage.cpp b/ydb/core/base/blobstorage.cpp index 8ce42d7ea9..b18215f508 100644 --- a/ydb/core/base/blobstorage.cpp +++ b/ydb/core/base/blobstorage.cpp @@ -115,6 +115,11 @@ std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::Make return res; } +std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse( + NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) { + return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason); +} + }; template<> diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index cad14e35e8..50b464aaf5 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -459,6 +459,7 @@ struct TEvBlobStorage { EvVBaldSyncLog, EvPatch, EvInplacePatch, + EvAssimilate, // EvPutResult = EvPut + 512, /// 268 632 576 @@ -472,6 +473,7 @@ struct TEvBlobStorage { EvVBaldSyncLogResult, EvPatchResult, EvInplacePatchResult, + EvAssimilateResult, // proxy <-> vdisk interface EvVPut = EvPut + 2 * 512, /// 268 633 088 @@ -868,6 +870,7 @@ struct TEvBlobStorage { struct TEvStatusResult; struct TEvPatchResult; struct TEvInplacePatchResult; + struct TEvAssimilateResult; struct TEvPut : public TEventLocal<TEvPut, EvPut> { enum ETactic { @@ -1994,6 +1997,170 @@ struct TEvBlobStorage { } }; + struct TEvAssimilate : TEventLocal<TEvAssimilate, EvAssimilate> { + std::optional<ui64> SkipBlocksUpTo; + std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo; + std::optional<TLogoBlobID> SkipBlobsUpTo; + ui32 RestartCounter = 0; + + TEvAssimilate(std::optional<ui64> skipBlocksUpTo, std::optional<std::tuple<ui64, ui8>> skipBarriersUpTo, + std::optional<TLogoBlobID> skipBlobsUpTo) + : SkipBlocksUpTo(skipBlocksUpTo) + , SkipBarriersUpTo(skipBarriersUpTo) + , SkipBlobsUpTo(skipBlobsUpTo) + {} + + TString Print(bool /*isFull*/) const { + return ToString(); + } + + TString ToString() const { + TStringStream str; + str << "TEvAssimilate {"; + const char *prefix = ""; + if (SkipBlocksUpTo) { + str << std::exchange(prefix, " ") << "SkipBlocksUpTo# " << *SkipBlocksUpTo; + } + if (SkipBarriersUpTo) { + str << std::exchange(prefix, " " ) << "SkipBarriersUpTo# " << std::get<0>(*SkipBarriersUpTo) + << ":" << int(std::get<1>(*SkipBarriersUpTo)); + } + if (SkipBlobsUpTo) { + str << std::exchange(prefix, " " ) << "SkipBlobsUpTo# "; + SkipBlobsUpTo->Out(str); + } + str << "}"; + return str.Str(); + } + + ui32 CalculateSize() const { + return sizeof(*this); + } + + std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason, + ui32 groupId); + }; + + struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> { + struct TBlock { + ui64 TabletId; + ui32 BlockedGeneration; + + TString ToString() const { + TStringStream str; + Output(str); + return str.Str(); + } + + void Output(IOutputStream& s) const { + s << "{" << TabletId << "=>" << BlockedGeneration << "}"; + } + }; + + struct TBarrier { + struct TValue { + ui32 RecordGeneration; + ui32 PerGenerationCounter; + ui32 CollectGeneration; + ui32 CollectStep; + + void Output(IOutputStream& s) const { + if (RecordGeneration || PerGenerationCounter || CollectGeneration || CollectGeneration) { + s << "{" << RecordGeneration << ":" << PerGenerationCounter << "=>" << CollectGeneration + << ":" << CollectStep << "}"; + } + } + }; + + ui64 TabletId; + ui8 Channel; + TValue Soft; + TValue Hard; + + TString ToString() const { + TStringStream str; + Output(str); + return str.Str(); + } + + void Output(IOutputStream& s) const { + s << "{" << TabletId << ":" << int(Channel) << "=>soft"; + Soft.Output(s); + s << "/hard"; + Hard.Output(s); + s << "}"; + } + }; + + struct TBlob { + TLogoBlobID Id; + bool Keep; + bool DoNotKeep; + + TString ToString() const { + TStringStream str; + Output(str); + return str.Str(); + } + + void Output(IOutputStream& s) const { + Id.Out(s); + if (Keep) { + s << "k"; + } + if (DoNotKeep) { + s << "d"; + } + } + }; + + NKikimrProto::EReplyStatus Status; + TString ErrorReason; + std::deque<TBlock> Blocks; + std::deque<TBarrier> Barriers; + std::deque<TBlob> Blobs; + + TEvAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason = {}) + : Status(status) + , ErrorReason(std::move(errorReason)) + {} + + TString Print(bool isFull) const { + TStringStream str; + str << "TEvAssimilateResult {" + << "Status# " << NKikimrProto::EReplyStatus_Name(Status) + << " ErrorReason# '" << ErrorReason << "'"; + + auto out = [&](const char *name, auto& container) { + str << " " << name << "# "; + if (isFull) { + str << "["; + for (auto it = container.begin(); it != container.end(); ++it) { + if (it != container.begin()) { + str << " "; + } + it->Output(str); + } + str << "]"; + } else { + str << "size=" << container.size(); + } + }; + + out("Blocks", Blocks); + out("Barriers", Barriers); + out("Blobs", Blobs); + + str << "}"; + + return str.Str(); + } + + TString ToString() const { + return Print(false); + } + }; + struct TEvConfigureProxy; struct TEvUpdateGroupInfo; diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index 330f795866..8c20c98b6e 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -19,8 +19,6 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator_copier.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator_fetch_machine.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index 272df7ae92..792491b5b4 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -118,10 +118,10 @@ namespace NKikimr::NBlobDepot { std::make_heap(Channels.begin(), Channels.end(), TChannelCompare()); } - std::pair<ui8, ui64> PickChannelBlobSeq() { + std::tuple<ui8, ui64> PickChannelBlobSeq() { std::pop_heap(Channels.begin(), Channels.end(), TChannelCompare()); TChannelInfo *channel = Channels.back(); - auto res = std::make_pair(channel->Index, channel->NextBlobSeqId++); + auto res = std::make_tuple(channel->Index, channel->NextBlobSeqId++); std::push_heap(Channels.begin(), Channels.end(), TChannelCompare()); Size += 4 << 20; // assume each written blob of this size in a first approximation return res; diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt index 8ebb4a51ef..86891c8205 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.txt @@ -35,4 +35,5 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_collect_garbage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_status.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_patch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_assimilate.cpp ) diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 25e1b93702..40c4fe2283 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -13,6 +13,7 @@ namespace NKikimr::NBlobDepot { XX(EvCollectGarbage) \ XX(EvStatus) \ XX(EvPatch) \ + XX(EvAssimilate) \ // END class TBlobDepotAgent; @@ -270,7 +271,7 @@ namespace NKikimr::NBlobDepot { void IssueGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto); ui32 GetNumAvailableItems() const; std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent); - std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, + std::tuple<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, ui32 size) const; void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep); void RebuildHeap(); diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index 06efcfbff0..864b5a1266 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -67,7 +67,7 @@ namespace NKikimr::NBlobDepot { }); } - std::pair<ui32, ui64> TBlobDepotAgent::TBlocksManager::GetBlockForTablet(ui64 tabletId) { + std::tuple<ui32, ui64> TBlobDepotAgent::TBlocksManager::GetBlockForTablet(ui64 tabletId) { if (const auto it = Blocks.find(tabletId); it != Blocks.end()) { const auto& record = it->second; return {record.BlockedGeneration, record.IssuerGuid}; diff --git a/ydb/core/blob_depot/agent/blocks.h b/ydb/core/blob_depot/agent/blocks.h index ddf79bac88..551465fbde 100644 --- a/ydb/core/blob_depot/agent/blocks.h +++ b/ydb/core/blob_depot/agent/blocks.h @@ -40,7 +40,7 @@ namespace NKikimr::NBlobDepot { void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override; void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg); void IssueOnUpdateBlock(TBlock& block, bool success); - std::pair<ui32, ui64> GetBlockForTablet(ui64 tabletId); + std::tuple<ui32, ui64> GetBlockForTablet(ui64 tabletId); void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive); void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets); }; diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp index a5be21c0b6..544b6d5244 100644 --- a/ydb/core/blob_depot/agent/channel_kind.cpp +++ b/ydb/core/blob_depot/agent/channel_kind.cpp @@ -55,7 +55,7 @@ namespace NKikimr::NBlobDepot { return TBlobSeqId::FromSequentalNumber(channel, agent.BlobDepotGeneration, value); } - std::pair<TLogoBlobID, ui32> TBlobDepotAgent::TChannelKind::MakeBlobId(TBlobDepotAgent& agent, + std::tuple<TLogoBlobID, ui32> TBlobDepotAgent::TChannelKind::MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, ui32 size) const { auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size); const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]]; diff --git a/ydb/core/blob_depot/agent/storage_assimilate.cpp b/ydb/core/blob_depot/agent/storage_assimilate.cpp new file mode 100644 index 0000000000..05964b1ca0 --- /dev/null +++ b/ydb/core/blob_depot/agent/storage_assimilate.cpp @@ -0,0 +1,26 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + template<> + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvAssimilate>(std::unique_ptr<IEventHandle> ev) { + class TAssimilateQuery : public TQuery { + public: + using TQuery::TQuery; + + void Initiate() override { + Y_VERIFY(Agent.ProxyId); + const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId)); + Y_VERIFY(sent); + delete this; + } + + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse /*response*/) override { + Y_FAIL(); + } + }; + + return new TAssimilateQuery(*this, std::move(ev)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index ef3c2b9913..7f7e218cdb 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -1,204 +1,127 @@ #include "blob_depot_tablet.h" -#include "assimilator_fetch_machine.h" -#include "assimilator_copier.h" -#include "schema.h" +#include "assimilator.h" #include "blocks.h" #include "garbage_collection.h" #include "data.h" namespace NKikimr::NBlobDepot { - class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> { + using TAssimilator = TBlobDepot::TGroupAssimilator; + + struct TStateBits { enum { - EvReconnectToController = EventSpaceBegin(TEvents::ES_PRIVATE), + Blocks = 1, + Barriers = 2, + Blobs = 4, }; + }; - const ui32 GroupId; - const NKikimrBlobDepot::TBlobDepotConfig Config; - const ui64 TabletId; - std::optional<TString> AssimilatorState; - TActorId BlobDepotId; - TIntrusivePtr<TBlobStorageGroupInfo> Info; - - public: - TGroupAssimilator(const NKikimrBlobDepot::TBlobDepotConfig& config, ui64 tabletId, std::optional<TString> assimilatorState) - : GroupId(config.GetDecommitGroupId()) - , Config(config) - , TabletId(tabletId) - , AssimilatorState(std::move(assimilatorState)) - { - Y_VERIFY(Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup); - } - - void Bootstrap(TActorId parentId) { - BlobDepotId = parentId; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "TGroupAssimilator::Bootstrap", (GroupId, GroupId)); - QueryGroupConfiguration(); - } - - void PassAway() override { - FetchMachine->OnPassAway(); - TActorBootstrapped::PassAway(); + void TAssimilator::Bootstrap() { + if (Token.expired()) { + return PassAway(); } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // BSC interaction - - TActorId ControllerPipeId; - - void QueryGroupConfiguration() { - TGroupID groupId(GroupId); - if (groupId.ConfigurationType() != EGroupConfigurationType::Dynamic) { - AbortWithError("group configuration type is not dynamic"); + const std::optional<TString>& state = Self->AssimilatorState; + if (state) { + TStringInput stream(*state); + ui8 stateBits; + Load(&stream, stateBits); + if (stateBits & TStateBits::Blocks) { + Load(&stream, SkipBlocksUpTo.emplace()); } - - const ui64 controllerId = MakeBSControllerID(groupId.AvailabilityDomainID()); - ControllerPipeId = Register(NTabletPipe::CreateClient(SelfId(), controllerId)); - Become(&TThis::StateQueryController); - } - - STRICT_STFUNC(StateQueryController, - cFunc(TEvents::TSystem::Poison, PassAway); - - cFunc(EvReconnectToController, QueryGroupConfiguration); - hFunc(TEvTabletPipe::TEvClientConnected, Handle); - hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); - hFunc(TEvBlobStorage::TEvControllerNodeServiceSetUpdate, Handle); - ); - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT00, "TGroupAssimilator::TEvClientConnected", (GroupId, GroupId)); - Y_VERIFY(ev->Get()->ClientId == ControllerPipeId); - if (ev->Get()->Status == NKikimrProto::OK) { - NTabletPipe::SendData(SelfId(), ControllerPipeId, new TEvBlobStorage::TEvControllerGetGroup(0, GroupId)); - } else { - Reconnect(); + if (stateBits & TStateBits::Barriers) { + Load(&stream, SkipBarriersUpTo.emplace()); } - } - - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "TGroupAssimilator::TEvClientDestroyed", (GroupId, GroupId)); - Y_VERIFY(ev->Get()->ClientId == ControllerPipeId); - Reconnect(); - } - - void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "TGroupAssimilator::TEvControllerNodeServiceSetUpdate", (GroupId, GroupId), - (Msg, ev->Get()->Record)); - NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId); - - auto& record = ev->Get()->Record; - if (record.HasStatus() && record.GetStatus() == NKikimrProto::OK && record.HasServiceSet()) { - const auto& ss = record.GetServiceSet(); - for (const auto& group : ss.GetGroups()) { - if (group.GetGroupID() == GroupId) { - if (group.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) { - return AbortWithError("the group being decommitted was destroyed"); - } else if (!group.HasBlobDepotId() || group.GetBlobDepotId() != TabletId) { - return AbortWithError("inconsistent decommission state"); - } else { - Info = TBlobStorageGroupInfo::Parse(group, nullptr, nullptr); - StartAssimilation(); - return; - } - } - } + if (stateBits & TStateBits::Blobs) { + Load(&stream, SkipBlobsUpTo.emplace()); } - - // retry operation in some time - Reconnect(); - } - - void Reconnect() { - NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId); - TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(EvReconnectToController, 0, - SelfId(), {}, nullptr, 0)); - } - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - std::unique_ptr<TGroupAssimilatorFetchMachine> FetchMachine; - - void StartAssimilation() { - Become(&TThis::StateAssimilate); - FetchMachine = std::make_unique<TGroupAssimilatorFetchMachine>(SelfId(), Info, BlobDepotId, - std::move(AssimilatorState)); } - void StateAssimilate(STFUNC_SIG) { - Y_UNUSED(ctx); + SendRequest(); + Become(&TThis::StateFunc); + } - switch (ev->GetTypeRewrite()) { - cFunc(TEvents::TSystem::Poison, PassAway); - IgnoreFunc(TEvTabletPipe::TEvClientDestroyed); + void TAssimilator::PassAway() { + } - default: - return FetchMachine->Handle(ev); - } + STATEFN(TAssimilator::StateFunc) { + if (Token.expired()) { + return PassAway(); } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - void AbortWithError(TString error) { - STLOG(PRI_ERROR, BLOB_DEPOT, BDT34, "failed to assimilate group", (GroupId, GroupId), (Error, error)); - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, BlobDepotId, SelfId(), nullptr, 0)); - PassAway(); - } - }; + switch (const ui32 type = ev->GetTypeRewrite()) { + hFunc(TEvBlobStorage::TEvAssimilateResult, Handle); - void TBlobDepot::StartGroupAssimilator() { - if (!RunningGroupAssimilator && Config.HasDecommitGroupId()) { - CopierId = RegisterWithSameMailbox(new TGroupAssimilatorCopierActor(this)); - RunningGroupAssimilator = Register(new TGroupAssimilator(Config, TabletID(), AssimilatorState)); + default: + Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); + STLOG(PRI_CRIT, BLOB_DEPOT, BDTxx, "unexpected event", (Id, Self->GetLogId()), (Type, type)); + break; } } - void TBlobDepot::HandleGone(TAutoPtr<IEventHandle> ev) { - if (ev->Sender == RunningGroupAssimilator) { - RunningGroupAssimilator = {}; - } else { - Y_FAIL("unexpected event"); - } + void TAssimilator::SendRequest() { + SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), new TEvBlobStorage::TEvAssimilate(SkipBlocksUpTo, + SkipBarriersUpTo, SkipBlobsUpTo)); } - void TBlobDepot::Handle(TEvAssimilatedData::TPtr ev) { + void TAssimilator::Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev) { class TTxPutAssimilatedData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - std::unique_ptr<TEvAssimilatedData> Ev; + TAssimilator *Self; + std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> Ev; bool UnblockRegisterActorQ = false; public: - TTxPutAssimilatedData(TBlobDepot *self, TEvAssimilatedData::TPtr ev) - : TTransactionBase(self) + TTxPutAssimilatedData(TAssimilator *self, TEvBlobStorage::TEvAssimilateResult::TPtr ev) + : TTransactionBase(self->Self) + , Self(self) , Ev(ev->Release().Release()) {} bool Execute(TTransactionContext& txc, const TActorContext&) override { NIceDb::TNiceDb db(txc.DB); + const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty(); + const bool blocksFinished = Ev->Blocks.empty() || !Ev->Barriers.empty() || !Ev->Blobs.empty() || done; + const bool barriersFinished = Ev->Barriers.empty() || !Ev->Blobs.empty() || done; + + if (const auto& blocks = Ev->Blocks; !blocks.empty()) { + Self->SkipBlocksUpTo = blocks.back().TabletId; + } + if (const auto& barriers = Ev->Barriers; !barriers.empty()) { + Self->SkipBarriersUpTo = {barriers.back().TabletId, barriers.back().Channel}; + } + if (const auto& blobs = Ev->Blobs; !blobs.empty()) { + Self->SkipBlobsUpTo = blobs.back().Id; + } + for (const auto& block : Ev->Blocks) { - Self->BlocksManager->AddBlockOnDecommit(block, txc); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block)); + Self->Self->BlocksManager->AddBlockOnDecommit(block, txc); } for (const auto& barrier : Ev->Barriers) { - Self->BarrierServer->AddBarrierOnDecommit(barrier, txc); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier)); + Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc); } for (const auto& blob : Ev->Blobs) { - Self->Data->AddDataOnDecommit(blob, txc); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob)); + Self->Self->Data->AddDataOnDecommit(blob, txc); } - if (Ev->BlocksFinished && Self->DecommitState < EDecommitState::BlocksFinished) { - Self->DecommitState = EDecommitState::BlocksFinished; + auto& decommitState = Self->Self->DecommitState; + if (blocksFinished && decommitState < EDecommitState::BlocksFinished) { + decommitState = EDecommitState::BlocksFinished; UnblockRegisterActorQ = true; } - if (Ev->BarriersFinished && Self->DecommitState < EDecommitState::BarriersFinished) { - Self->DecommitState = EDecommitState::BarriersFinished; + if (barriersFinished && decommitState < EDecommitState::BarriersFinished) { + decommitState = EDecommitState::BarriersFinished; } - if (Ev->BlobsFinished && Self->DecommitState < EDecommitState::BlobsFinished) { - Self->DecommitState = EDecommitState::BlobsFinished; + if (done && decommitState < EDecommitState::BlobsFinished) { + decommitState = EDecommitState::BlobsFinished; } db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update( - NIceDb::TUpdate<Schema::Config::DecommitState>(Self->DecommitState), - NIceDb::TUpdate<Schema::Config::AssimilatorState>(Ev->AssimilatorState) + NIceDb::TUpdate<Schema::Config::DecommitState>(decommitState), + NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState()) ); return true; @@ -206,32 +129,49 @@ namespace NKikimr::NBlobDepot { void Complete(const TActorContext&) override { if (UnblockRegisterActorQ) { - STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->GetLogId()), - (DecommitGroupId, Self->Config.GetDecommitGroupId())); - Self->ProcessRegisterAgentQ(); + STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->Self->GetLogId()), + (DecommitGroupId, Self->Self->Config.GetDecommitGroupId())); + Self->Self->ProcessRegisterAgentQ(); } - if (EDecommitState::BlobsFinished <= Self->DecommitState) { - // finished metadata replication, time to kill the assimilator - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, Self->RunningGroupAssimilator, - Self->SelfId(), nullptr, 0)); - Self->RunningGroupAssimilator = {}; + if (EDecommitState::BlobsFinished <= Self->Self->DecommitState) { + // finished metadata replication } else { - TActivationContext::Send(new IEventHandle(TEvPrivate::EvAssimilatedDataConfirm, 0, - Self->RunningGroupAssimilator, Self->SelfId(), nullptr, 0)); // ask assimilator to resume process + Self->SendRequest(); } } }; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "received TEvAssimilatedData", (Id, GetLogId()), - (Blocks.size, ev->Get()->Blocks.size()), (Barriers.size, ev->Get()->Barriers.size()), - (Blobs.size, ev->Get()->Blobs.size()), (BlocksFinished, ev->Get()->BlocksFinished), - (BarriersFinished, ev->Get()->BarriersFinished), (BlobsFinished, ev->Get()->BlobsFinished)); - Execute(std::make_unique<TTxPutAssimilatedData>(this, ev)); + Self->Execute(std::make_unique<TTxPutAssimilatedData>(this, ev)); } - void TBlobDepot::ProcessAssimilatedData(TEvAssimilatedData& msg) { - (void)msg; + TString TAssimilator::SerializeAssimilatorState() const { + TStringStream stream; + + const ui8 stateBits = (SkipBlocksUpTo ? TStateBits::Blocks : 0) + | (SkipBarriersUpTo ? TStateBits::Barriers : 0) + | (SkipBlocksUpTo ? TStateBits::Blocks : 0); + + Save(&stream, stateBits); + + if (SkipBlocksUpTo) { + Save(&stream, *SkipBlocksUpTo); + } + if (SkipBarriersUpTo) { + Save(&stream, *SkipBarriersUpTo); + } + if (SkipBlobsUpTo) { + Save(&stream, *SkipBlobsUpTo); + } + + return stream.Str(); + } + + void TBlobDepot::StartGroupAssimilator() { + if (Config.HasDecommitGroupId()) { + Y_VERIFY(!GroupAssimilatorId); + GroupAssimilatorId = RegisterWithSameMailbox(new TGroupAssimilator(this)); + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h new file mode 100644 index 0000000000..bab6d171bf --- /dev/null +++ b/ydb/core/blob_depot/assimilator.h @@ -0,0 +1,34 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> { + std::weak_ptr<TToken> Token; + TBlobDepot *Self; + + std::optional<ui64> SkipBlocksUpTo; + std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo; + std::optional<TLogoBlobID> SkipBlobsUpTo; + + public: + TGroupAssimilator(TBlobDepot *self) + : Token(self->Token) + , Self(self) + { + Y_VERIFY(Self->Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup); + } + + void Bootstrap(); + void PassAway() override; + STATEFN(StateFunc); + + private: + void SendRequest(); + void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev); + TString SerializeAssimilatorState() const; + }; + +} // NKikimrBlobDepot::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator_copier.cpp b/ydb/core/blob_depot/assimilator_copier.cpp deleted file mode 100644 index 11456969b0..0000000000 --- a/ydb/core/blob_depot/assimilator_copier.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include "assimilator_copier.h" - -namespace NKikimr::NBlobDepot { - - using TCopierActor = TBlobDepot::TGroupAssimilatorCopierActor; - - TCopierActor::TGroupAssimilatorCopierActor(TBlobDepot *self) - : Self(self) - , Token(self->Token) - {} - - void TCopierActor::Bootstrap() { - Become(&TThis::StateFunc); - } - - STATEFN(TCopierActor::StateFunc) { - if (Token.expired()) { - return; // BlobDepot tablet is already dead, we can't access its internals - } - - switch (const ui32 type = ev->GetTypeRewrite()) { - cFunc(TEvents::TSystem::Poison, PassAway); - - default: - Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); - STLOG(PRI_CRIT, BLOB_DEPOT, BDT46, "unexpected event", (Id, Self->GetLogId()), (Type, type)); - break; - } - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator_copier.h b/ydb/core/blob_depot/assimilator_copier.h deleted file mode 100644 index 6f4926b75c..0000000000 --- a/ydb/core/blob_depot/assimilator_copier.h +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once - -#include "defs.h" - -#include "blob_depot_tablet.h" - -namespace NKikimr::NBlobDepot { - - class TBlobDepot::TGroupAssimilatorCopierActor : public TActorBootstrapped<TGroupAssimilatorCopierActor> { - TBlobDepot* const Self; - std::weak_ptr<TBlobDepot::TToken> Token; - - public: - TGroupAssimilatorCopierActor(TBlobDepot *self); - void Bootstrap(); - STATEFN(StateFunc); - }; - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.cpp b/ydb/core/blob_depot/assimilator_fetch_machine.cpp deleted file mode 100644 index 2edf40b598..0000000000 --- a/ydb/core/blob_depot/assimilator_fetch_machine.cpp +++ /dev/null @@ -1,382 +0,0 @@ -#include "assimilator_fetch_machine.h" - -namespace NKikimr::NBlobDepot { - - using TFetchMachine = TBlobDepot::TGroupAssimilatorFetchMachine; - - struct TStateMask { - enum { - Block = 1, - Barrier = 2, - Blob = 4, - }; - }; - - TFetchMachine::TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info, - TActorId blobDepotId, const std::optional<TString>& assimilatorState) - : Self(self) - , Info(std::move(info)) - , BlobDepotId(blobDepotId) - { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT38, "TGroupAssimilatorFetchMachine start", (GroupId, Info->GroupID)); - - PerDiskState.resize(Info->GetTotalVDisksNum()); - for (ui32 i = 0; i < PerDiskState.size(); ++i) { - const TActorId actorId = Info->GetActorId(i); - const ui32 nodeId = actorId.NodeId(); - TNodeInfo& node = Nodes[nodeId]; - node.OrderNumbers.push_back(i); - } - - for (const auto& [nodeId, node] : Nodes) { - if (nodeId != Self.NodeId()) { - TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, - TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0)); - } else { - for (const ui32 orderNumber : node.OrderNumbers) { - IssueAssimilateCmdToVDisk(orderNumber); - } - } - } - - if (assimilatorState) { - TStringInput stream(*assimilatorState); - ui8 mask = 0; - Load(&stream, mask); - if (mask & TStateMask::Block) { - Load(&stream, LastProcessedBlock.emplace()); - } - if (mask & TStateMask::Barrier) { - Load(&stream, LastProcessedBarrier.emplace()); - } - if (mask & TStateMask::Blob) { - Load(&stream, LastProcessedBlob.emplace()); - } - } - } - - void TFetchMachine::Handle(TAutoPtr<IEventHandle>& ev) { - switch (const ui32 type = ev->GetTypeRewrite()) { - hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle); - hFunc(TEvents::TEvUndelivered, Handle); - hFunc(TEvInterconnect::TEvNodeConnected, Handle); - hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); - cFunc(TEvPrivate::EvAssimilatedDataConfirm, HandleAssimilateDataConfirm); - - default: - Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type); - } - } - - void TFetchMachine::OnPassAway() { - for (const auto& [nodeId, node] : Nodes) { - if (nodeId != Self.NodeId()) { - TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0, - TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0)); - } - } - } - - void TFetchMachine::IssueAssimilateCmdToVDisk(ui32 orderNumber) { - const TActorId actorId = Info->GetActorId(orderNumber); - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT39, "IssueAssimilateCmdToVDisk", (GroupId, Info->GroupID), - (OrderNumber, orderNumber), (ActorId, actorId)); - - TPerDiskState& state = PerDiskState[orderNumber]; - Y_VERIFY(!state.Finished); - Y_VERIFY(!state.RequestInFlight); - state.RequestInFlight = true; - - auto maxOpt = [](const auto& x, const auto& y) { return !y ? *x : !x ? *y : *x < *y ? *y : *x; }; - - auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(Info->GetVDiskId(orderNumber)); - auto& record = ev->Record; - if (state.LastBlock || LastProcessedBlock) { - record.SetSkipBlocksUpTo(maxOpt(state.LastBlock, LastProcessedBlock)); - } - if (state.LastBarrier || LastProcessedBarrier) { - auto *x = record.MutableSkipBarriersUpTo(); - const auto& [tabletId, channel] = maxOpt(state.LastBarrier, LastProcessedBarrier); - x->SetTabletId(tabletId); - x->SetChannel(channel); - } - if (state.LastBlob || LastProcessedBlob) { - LogoBlobIDFromLogoBlobID(maxOpt(state.LastBlob, LastProcessedBlob), record.MutableSkipBlobsUpTo()); - } - - const ui64 id = ++LastRequestId; - Self.Send(actorId, ev.release(), IEventHandle::FlagTrackDelivery, id); - - const auto [it, inserted] = RequestsInFlight.emplace(id, TRequestInFlight{orderNumber}); - Y_VERIFY(inserted); - - const ui32 nodeId = actorId.NodeId(); - Nodes[nodeId].RequestsInFlight.insert(&*it); - } - - void TFetchMachine::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) { - const ui32 nodeId = ev->Get()->NodeId; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT40, "NodeConnected", (GroupId, Info->GroupID), (NodeId, nodeId)); - for (const ui32 orderNumber : Nodes[nodeId].OrderNumbers) { - if (auto& state = PerDiskState[orderNumber]; !state.Finished) { - IssueAssimilateCmdToVDisk(orderNumber); - } - } - } - - void TFetchMachine::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) { - const ui32 nodeId = ev->Get()->NodeId; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT41, "NodeDisconnected", (GroupId, Info->GroupID), (NodeId, nodeId)); - for (const auto *kv : std::exchange(Nodes[nodeId].RequestsInFlight, {})) { - TPerDiskState& state = PerDiskState[kv->second.OrderNumber]; - Y_VERIFY(state.RequestInFlight); - state.RequestInFlight = false; - const size_t num = RequestsInFlight.erase(kv->first); - Y_VERIFY(num == 1); - } - Merge(); - TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0, - TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0)); - } - - void TFetchMachine::Handle(TEvents::TEvUndelivered::TPtr ev) { - if (ev->Get()->SourceType == TEvBlobStorage::EvVAssimilate) { - // TODO: undelivery may be caused by moving VDisk actor out, handle it - EndRequest(ev->Cookie); - Merge(); - } - } - - ui32 TFetchMachine::EndRequest(ui64 id) { - const auto it = RequestsInFlight.find(id); - Y_VERIFY(it != RequestsInFlight.end()); - const ui32 orderNumber = it->second.OrderNumber; - TPerDiskState& state = PerDiskState[orderNumber]; - Y_VERIFY(state.RequestInFlight); - state.RequestInFlight = false; - const TActorId actorId = Info->GetActorId(orderNumber); - const ui32 nodeId = actorId.NodeId(); - TNodeInfo& node = Nodes[nodeId]; - const size_t num = node.RequestsInFlight.erase(&*it); - Y_VERIFY(num == 1); - RequestsInFlight.erase(it); - return orderNumber; - } - - void TFetchMachine::Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) { - const ui32 orderNumber = EndRequest(ev->Cookie); - const auto& record = ev->Get()->Record; - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "EvVAssimilate", (GroupId, Info->GroupID), (Id, ev->Cookie), - (OrderNumber, orderNumber), (Status, record.GetStatus()), (Blocks.size, record.BlocksSize()), - (Barriers.size, record.BarriersSize()), (Blobs.size, record.BlobsSize())); - - TPerDiskState& state = PerDiskState[orderNumber]; - - const bool wasExhausted = state.Exhausted(); - - for (const auto& item : record.GetBlocks()) { - const ui64 tabletId = item.GetTabletId(); - state.LastBlock.emplace(tabletId); - if (!LastProcessedBlock || *LastProcessedBlock <= *state.LastBlock) { - state.Blocks.emplace_back(item); - } - } - for (const auto& item : record.GetBarriers()) { - const ui64 tabletId = item.GetTabletId(); - const ui8 channel = item.GetChannel(); - state.LastBarrier.emplace(tabletId, channel); - if (!LastProcessedBarrier || *LastProcessedBarrier <= *state.LastBarrier) { - state.Barriers.emplace_back(item); - } - } - ui64 raw[3] = {0, 0, 0}; - for (const auto& item : record.GetBlobs()) { - if (item.HasRawX1()) { - raw[0] = item.GetRawX1(); - } else if (item.HasDiffX1()) { - raw[0] += item.GetDiffX1(); - } - if (item.HasRawX2()) { - raw[1] = item.GetRawX2(); - } else if (item.HasDiffX2()) { - raw[1] += item.GetDiffX2(); - } - if (item.HasRawX3()) { - raw[2] = item.GetRawX3(); - } else if (item.HasDiffX3()) { - raw[2] += item.GetDiffX3(); - } - const TLogoBlobID id(raw); - state.LastBlob.emplace(id); - if (!LastProcessedBlob || *LastProcessedBlob <= *state.LastBlob) { - state.Blobs.emplace_back(item, id); - } - } - - if (wasExhausted && !state.Exhausted()) { - Heap.push_back(&state); - std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare()); - } - - if (record.BlocksSize() + record.BarriersSize() + record.BlobsSize() == 0 && record.GetStatus() == NKikimrProto::OK) { - state.Finished = true; - } else if (state.Exhausted()) { // still no records; for example, when all were skipped - return IssueAssimilateCmdToVDisk(orderNumber); - } - - Merge(); - } - - void TFetchMachine::Merge() { - if (AssimilateDataInFlight) { - return; - } - - auto ev = std::make_unique<TEvAssimilatedData>(); - - const TBlobStorageGroupInfo::TTopology *top = &Info->GetTopology(); - TBlobStorageGroupInfo::TGroupVDisks mergeableDisks(top); - for (ui32 i = 0; i < PerDiskState.size(); ++i) { - TPerDiskState& state = PerDiskState[i]; - if (!state.Exhausted() || state.Finished) { - mergeableDisks |= {top, top->GetVDiskId(i)}; - } else if (state.RequestInFlight) { - return; - } - } - - static constexpr ui64 MaxBlock = Max<ui64>(); - static constexpr std::tuple<ui64, ui8> MaxBarrier = std::make_tuple(Max<ui64>(), Max<ui8>()); - static const TLogoBlobID MaxBlob = Max<TLogoBlobID>(); - - bool quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(mergeableDisks); - while (quorumCorrect) { - if (Heap.empty()) { - LastProcessedBlock.emplace(MaxBlock); - LastProcessedBarrier.emplace(MaxBarrier); - LastProcessedBlob.emplace(MaxBlob); - break; - } - - std::optional<TBlock> block; - std::optional<TBarrier> barrier; - std::optional<TBlob> blob; - TSubgroupPartLayout layout; - - auto callback = [&](auto&& value, ui32 orderNumber) { - using T = std::decay_t<decltype(value)>; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "AssimilatedItem", (GroupId, Info->GroupID), (OrderNumber, orderNumber), - (Value, value)); - if constexpr (std::is_same_v<T, TBlock>) { - Y_VERIFY(!LastProcessedBlock || *LastProcessedBlock < value.TabletId); - if (block) { - block->Merge(value); - } else { - block.emplace(std::move(value)); - } - } else if constexpr (std::is_same_v<T, TBarrier>) { - Y_VERIFY(!LastProcessedBarrier || *LastProcessedBarrier < std::make_tuple(value.TabletId, value.Channel)); - if (barrier) { - barrier->Merge(value); - } else { - barrier.emplace(std::move(value)); - } - } else if constexpr (std::is_same_v<T, TBlob>) { - Y_VERIFY(!LastProcessedBlob || *LastProcessedBlob < value.Id); - if (blob) { - blob->Merge(value); - } else { - blob.emplace(std::move(value)); - } - - auto local = TIngress(value.Ingress).LocalParts(top->GType); - for (ui8 partIdx = local.FirstPosition(); partIdx != local.GetSize(); partIdx = local.NextPosition(partIdx)) { - layout.AddItem(top->GetIdxInSubgroup(top->GetVDiskId(orderNumber), blob->Id.Hash()), partIdx, top->GType); - } - } else { - static_assert(TDependentFalse<T>, "incorrect case"); - } - }; - - TPerDiskState& head = *Heap.front(); - auto key = head.FirstKey(); - while (!Heap.empty() && Heap.front()->FirstKey() == key) { - std::pop_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare()); - const ui32 orderNumber = Heap.back() - PerDiskState.data(); - TPerDiskState& item = PerDiskState[orderNumber]; - item.PopFirstItem(std::bind(callback, std::placeholders::_1, orderNumber)); - if (item.Exhausted()) { - if (!item.Finished) { - // data not yet received -- ask for it - IssueAssimilateCmdToVDisk(orderNumber); - // mark disk temporarily unavailable - mergeableDisks -= {top, top->GetVDiskId(orderNumber)}; - quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(mergeableDisks); - } - // remove item from the heap -- it has no valid data to process - Heap.pop_back(); - } else { - // more items to do - std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare()); - } - } - - if (block) { - LastProcessedBlock.emplace(block->TabletId); - ev->Blocks.push_back(std::move(*block)); - } else if (barrier) { - LastProcessedBlock.emplace(MaxBlock); - LastProcessedBarrier.emplace(barrier->TabletId, barrier->Channel); - ev->Barriers.push_back(std::move(*barrier)); - } else if (blob) { - blob->Keep = TIngress(blob->Ingress).KeepUnconditionally(TIngress::IngressMode(top->GType)); - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT48, "assimilated blob", (GroupId, Info->GroupID), (Id, blob->Id), - (Layout, layout.ToString(top->GType)), (Keep, blob->Keep)); - LastProcessedBlock.emplace(MaxBlock); - LastProcessedBarrier.emplace(MaxBarrier); - LastProcessedBlob.emplace(blob->Id); - ev->Blobs.push_back(std::move(*blob)); - } else { - Y_FAIL(); - } - - if (ev->Blocks.size() + ev->Barriers.size() + ev->Blobs.size() == 10'000) { - break; - } - } - - ev->BlocksFinished = LastProcessedBlock == MaxBlock; - ev->BarriersFinished = LastProcessedBarrier == MaxBarrier; - ev->BlobsFinished = LastProcessedBlob == MaxBlob; - Y_VERIFY(ev->BlocksFinished >= ev->BarriersFinished && ev->BarriersFinished >= ev->BlobsFinished); - - // store the assimilator state - TStringOutput stream(ev->AssimilatorState); - - Save(&stream, ui8((LastProcessedBlock ? TStateMask::Block : 0) - | (LastProcessedBarrier ? TStateMask::Barrier : 0) - | (LastProcessedBlob ? TStateMask::Blob : 0))); - if (LastProcessedBlock) { - Save(&stream, *LastProcessedBlock); - } - if (LastProcessedBarrier) { - Save(&stream, *LastProcessedBarrier); - } - if (LastProcessedBlob) { - Save(&stream, *LastProcessedBlob); - } - - Self.Send(BlobDepotId, ev.release()); - - AssimilateDataInFlight = true; - } - - void TFetchMachine::HandleAssimilateDataConfirm() { - Y_VERIFY(AssimilateDataInFlight); - AssimilateDataInFlight = false; - Merge(); - } - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.h b/ydb/core/blob_depot/assimilator_fetch_machine.h deleted file mode 100644 index 225c751d9e..0000000000 --- a/ydb/core/blob_depot/assimilator_fetch_machine.h +++ /dev/null @@ -1,135 +0,0 @@ -#pragma once - -#include "defs.h" -#include "blob_depot_tablet.h" - -namespace NKikimr::NBlobDepot { - - class TBlobDepot::TGroupAssimilatorFetchMachine { - TActorIdentity Self; - TIntrusivePtr<TBlobStorageGroupInfo> Info; - TActorId BlobDepotId; - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Data processing - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - struct TPerDiskState { - std::deque<TBlock> Blocks; - std::optional<ui64> LastBlock; - std::deque<TBarrier> Barriers; - std::optional<std::tuple<ui64, ui8>> LastBarrier; - std::deque<TBlob> Blobs; - std::optional<TLogoBlobID> LastBlob; - bool Finished = false; - bool RequestInFlight = false; - - ui64 FirstBlock() const { - Y_VERIFY_DEBUG(!Blocks.empty()); - return Blocks.front().TabletId; - } - - std::tuple<ui64, ui8> FirstBarrier() const { - Y_VERIFY_DEBUG(!Barriers.empty()); - const auto& barrier = Barriers.front(); - return {barrier.TabletId, barrier.Channel}; - } - - TLogoBlobID FirstBlob() const { - Y_VERIFY_DEBUG(!Blobs.empty()); - return Blobs.front().Id; - } - - std::variant<ui64, std::tuple<ui64, ui8>, TLogoBlobID> FirstKey() const { - if (!Blocks.empty()) { - return FirstBlock(); - } else if (!Barriers.empty()) { - return FirstBarrier(); - } else if (!Blobs.empty()) { - return FirstBlob(); - } else { - Y_FAIL(); - } - } - - template<typename T> - void PopFirstItem(T&& callback) { - if (!Blocks.empty()) { - callback(Blocks.front()); - Blocks.pop_front(); - } else if (!Barriers.empty()) { - callback(Barriers.front()); - Barriers.pop_front(); - } else if (!Blobs.empty()) { - callback(Blobs.front()); - Blobs.pop_front(); - } else { - Y_FAIL(); - } - } - - struct THeapCompare { - bool operator ()(const TPerDiskState *x, const TPerDiskState *y) const { - if (!x->Blocks.empty() && !y->Blocks.empty()) { - return x->FirstBlock() > y->FirstBlock(); - } else if (x->Blocks.empty() != y->Blocks.empty()) { - return x->Blocks.empty() > y->Blocks.empty(); - } else if (!x->Barriers.empty() && !y->Barriers.empty()) { - return x->FirstBarrier() > y->FirstBarrier(); - } else if (x->Barriers.empty() != y->Barriers.empty()) { - return x->Barriers.empty() > y->Barriers.empty(); - } else if (!x->Blobs.empty() && !y->Blobs.empty()) { - return x->FirstBlob() > y->FirstBlob(); - } else { - return x->Barriers.empty() > y->Barriers.empty(); - } - } - }; - - bool Exhausted() const { - return Blocks.empty() && Barriers.empty() && Blobs.empty(); - } - }; - - std::vector<TPerDiskState> PerDiskState; - std::vector<TPerDiskState*> Heap; - std::optional<ui64> LastProcessedBlock; - std::optional<std::tuple<ui64, ui8>> LastProcessedBarrier; - std::optional<TLogoBlobID> LastProcessedBlob; - - struct TRequestInFlight { - ui32 OrderNumber; - }; - - using TRequestsInFlight = THashMap<ui64, TRequestInFlight>; - - struct TNodeInfo { - std::vector<ui32> OrderNumbers; - THashSet<TRequestsInFlight::value_type*> RequestsInFlight; - }; - - ui32 LastRequestId = 0; - TRequestsInFlight RequestsInFlight; - THashMap<ui32, TNodeInfo> Nodes; - - bool AssimilateDataInFlight = false; - - public: - TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info, - TActorId blobDepotId, const std::optional<TString>& assimilatorState); - void Handle(TAutoPtr<IEventHandle>& ev); - void OnPassAway(); - - private: - void IssueAssimilateCmdToVDisk(ui32 orderNumber); - void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev); - void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev); - void Handle(TEvents::TEvUndelivered::TPtr ev); - ui32 EndRequest(ui64 id); - void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev); - void HandleAssimilateDataConfirm(); - void Merge(); - void MergeDone(); - }; - -} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp index e48168aac8..f1ca62ccd2 100644 --- a/ydb/core/blob_depot/blob_depot.cpp +++ b/ydb/core/blob_depot/blob_depot.cpp @@ -46,9 +46,6 @@ namespace NKikimr::NBlobDepot { hFunc(TEvTabletPipe::TEvServerConnected, Handle); hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); - fFunc(TEvents::TSystem::Gone, HandleGone); - hFunc(TEvAssimilatedData, Handle); - default: if (!HandleDefaultEvents(ev, ctx)) { Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); @@ -61,7 +58,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::PassAway() { - for (const TActorId& actorId : {RunningGroupAssimilator, CopierId}) { + for (const TActorId& actorId : {GroupAssimilatorId}) { if (actorId) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0)); } diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index e9e7be234e..c3529148b1 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -16,140 +16,9 @@ namespace NKikimr::NBlobDepot { struct TEvPrivate { enum { EvCheckExpiredAgents = EventSpaceBegin(TEvents::ES_PRIVATE), - EvAssimilatedData, - EvAssimilatedDataConfirm, }; }; - struct TBlock { - ui64 TabletId; - ui32 BlockedGeneration; - ui64 IssuerGuid = 0; - - TBlock() = default; - - TBlock(const NKikimrBlobStorage::TEvVAssimilateResult::TBlock& item) - : TabletId(item.GetTabletId()) - , BlockedGeneration(item.GetBlockedGeneration()) - {} - - template<typename T> - static TBlock FromRow(T&& row) { - TBlock block; - block.TabletId = row.template GetValue<Schema::Blocks::TabletId>(); - block.BlockedGeneration = row.template GetValue<Schema::Blocks::BlockedGeneration>(); - block.IssuerGuid = row.template GetValue<Schema::Blocks::IssuerGuid>(); - return block; - } - - void Merge(TBlock& other) { - Y_VERIFY_DEBUG(other.TabletId == TabletId); - BlockedGeneration = Max(BlockedGeneration, other.BlockedGeneration); - } - - TString ToString() const { - return TStringBuilder() << "{" << TabletId << ":" << BlockedGeneration << "}"; - } - }; - - struct TBarrier { - struct TValue { - TGenStep GenCtr; - TGenStep Collect; - - TValue() = default; - - TValue(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier::TValue& value) - : GenCtr(value.GetRecordGeneration(), value.GetPerGenerationCounter()) - , Collect(value.GetCollectGeneration(), value.GetCollectStep()) - {} - - void Merge(TValue& other) { - if (GenCtr < other.GenCtr) { - *this = other; - } - } - - TString ToString() const { - return TStringBuilder() << "{" << GenCtr.ToString() << "=>" << Collect.ToString() << "}"; - } - }; - - ui64 TabletId; - ui8 Channel; - TValue Hard; - TValue Soft; - - TBarrier() = default; - - TBarrier(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier& item) - : TabletId(item.GetTabletId()) - , Channel(item.GetChannel()) - , Hard(item.HasHard() ? TValue(item.GetHard()) : TValue()) - , Soft(item.HasSoft() ? TValue(item.GetSoft()) : TValue()) - {} - - template<typename TRow> - static TBarrier FromRow(TRow&& row) { - using T = Schema::Barriers; - TBarrier barrier; - barrier.TabletId = row.template GetValue<T::TabletId>(); - barrier.Channel = row.template GetValue<T::Channel>(); - if (row.template HaveValue<T::HardGenCtr>() && row.template HaveValue<T::Hard>()) { - barrier.Hard.GenCtr = TGenStep(row.template GetValue<T::HardGenCtr>()); - barrier.Hard.Collect = TGenStep(row.template GetValue<T::Hard>()); - } - if (row.template HaveValue<T::SoftGenCtr>() && row.template HaveValue<T::Soft>()) { - barrier.Soft.GenCtr = TGenStep(row.template GetValue<T::SoftGenCtr>()); - barrier.Soft.Collect = TGenStep(row.template GetValue<T::Soft>()); - } - return barrier; - } - - void Merge(TBarrier& other) { - Y_VERIFY_DEBUG(TabletId == other.TabletId && Channel == other.Channel); - Hard.Merge(other.Hard); - Soft.Merge(other.Soft); - } - - TString ToString() const { - return TStringBuilder() << "{" << TabletId << ":" << int(Channel) << "@" << Hard.ToString() << "/" - << Soft.ToString() << "}"; - } - }; - - struct TBlob { - TLogoBlobID Id; - ui64 Ingress; - bool Keep = false; - - TBlob() = default; - - TBlob(const NKikimrBlobStorage::TEvVAssimilateResult::TBlob& item, const TLogoBlobID& id) - : Id(id) - , Ingress(item.GetIngress()) - {} - - void Merge(TBlob& other) { - Y_VERIFY_DEBUG(Id == other.Id); - Ingress |= other.Ingress; - } - - TString ToString() const { - return TStringBuilder() << "{" << Id.ToString() << "/" << Ingress << "}"; - } - }; - - struct TEvAssimilatedData : TEventLocal<TEvAssimilatedData, TEvPrivate::EvAssimilatedData> { - std::deque<TBlock> Blocks; - bool BlocksFinished = false; - std::deque<TBarrier> Barriers; - bool BarriersFinished = false; - std::deque<TBlob> Blobs; - bool BlobsFinished = false; - TString AssimilatorState; - }; - public: TBlobDepot(TActorId tablet, TTabletStorageInfo *info); ~TBlobDepot(); @@ -337,20 +206,13 @@ namespace NKikimr::NBlobDepot { struct TToken {}; std::shared_ptr<TToken> Token = std::make_shared<TToken>(); - TActorId RunningGroupAssimilator; - TActorId CopierId; + TActorId GroupAssimilatorId; EDecommitState DecommitState = EDecommitState::Default; std::optional<TString> AssimilatorState; class TGroupAssimilator; - class TGroupAssimilatorFetchMachine; - - class TGroupAssimilatorCopierActor; void StartGroupAssimilator(); - void HandleGone(TAutoPtr<IEventHandle> ev); - void Handle(TEvAssimilatedData::TPtr ev); - void ProcessAssimilatedData(TEvAssimilatedData& msg); }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 5cd5dfb685..a2604fdd80 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -190,20 +190,21 @@ namespace NKikimr::NBlobDepot { ) }; - void TBlobDepot::TBlocksManager::AddBlockOnLoad(const TBlobDepot::TBlock& block) { - Blocks[block.TabletId] = { - .BlockedGeneration = block.BlockedGeneration, - .IssuerGuid = block.IssuerGuid, + void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration, ui64 issuerGuid) { + Blocks[tabletId] = { + .BlockedGeneration = blockedGeneration, + .IssuerGuid = issuerGuid, }; } - void TBlobDepot::TBlocksManager::AddBlockOnDecommit(const TBlobDepot::TBlock& block, NTabletFlatExecutor::TTransactionContext& txc) { - AddBlockOnLoad(block); + void TBlobDepot::TBlocksManager::AddBlockOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlock& block, + NTabletFlatExecutor::TTransactionContext& txc) { + AddBlockOnLoad(block.TabletId, block.BlockedGeneration, 0); NIceDb::TNiceDb db(txc.DB); db.Table<Schema::Blocks>().Key(block.TabletId).Update( NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(block.BlockedGeneration), - NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(block.IssuerGuid) + NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(0) ); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT44, "adding block through decommission", (Id, Self->GetLogId()), (Block, block)); diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h index 5e0ed55324..658fafba12 100644 --- a/ydb/core/blob_depot/blocks.h +++ b/ydb/core/blob_depot/blocks.h @@ -39,8 +39,8 @@ namespace NKikimr::NBlobDepot { : Self(self) {} - void AddBlockOnLoad(const TBlobDepot::TBlock& block); - void AddBlockOnDecommit(const TBlobDepot::TBlock& block, NTabletFlatExecutor::TTransactionContext& txc); + void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration, ui64 issuerGuid); + void AddBlockOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlock& block, NTabletFlatExecutor::TTransactionContext& txc); void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid, std::unique_ptr<IEventHandle> response); void Handle(TEvBlobDepot::TEvBlock::TPtr ev); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index fc69cd33e3..937daf5abe 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -25,7 +25,8 @@ namespace NKikimr::NBlobDepot { PutKey(std::move(key), TValue(std::move(proto))); } - void TData::AddDataOnDecommit(const TBlobDepot::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc) { + void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, + NTabletFlatExecutor::TTransactionContext& txc) { bool underSoft, underHard; Self->BarrierServer->GetBlobBarrierRelation(blob.Id, &underSoft, &underHard); if (underHard || (underSoft && !blob.Keep)) { diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 797e6a5ac8..e6f83d2056 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -335,7 +335,7 @@ namespace NKikimr::NBlobDepot { TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id); void AddDataOnLoad(TKey key, TString value); - void AddDataOnDecommit(const TBlobDepot::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc); + void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc); void AddTrashOnLoad(TLogoBlobID id); void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep); diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 773520084f..e961ca960e 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -90,7 +90,7 @@ namespace NKikimr { if (ev.InterconnectSession) { handle->Rewrite(TEvInterconnect::EvForward, ev.InterconnectSession); } - return std::make_pair(std::move(handle), record); + return std::make_tuple(std::move(handle), record); } }; diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 8c378e712e..34f057652c 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -21,7 +21,7 @@ namespace NKikimr::NBlobDepot { public: TTxCollectGarbage(TBlobDepot *self, ui64 tabletId, ui8 channel, ui32 keepIndex = 0, ui32 doNotKeepIndex = 0) : TTransactionBase(self) - , Barrier(Self->BarrierServer->Barriers[std::make_pair(tabletId, channel)]) + , Barrier(Self->BarrierServer->Barriers[std::make_tuple(tabletId, channel)]) , Request(Barrier.ProcessingQ.front()) , TabletId(tabletId) , Channel(channel) @@ -67,7 +67,7 @@ namespace NKikimr::NBlobDepot { } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) { Error = "Generation/PerGenerationCounter are not set"; } else { - const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel()); auto& barriers = Self->BarrierServer->Barriers; if (const auto it = barriers.find(key); it != barriers.end()) { // extract existing barrier record @@ -146,7 +146,7 @@ namespace NKikimr::NBlobDepot { return false; } - const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel()); auto& barrier = Self->BarrierServer->Barriers[key]; TGenStep& barrierGenCtr = record.GetHard() ? barrier.HardGenCtr : barrier.SoftGenCtr; TGenStep& barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; @@ -175,45 +175,50 @@ namespace NKikimr::NBlobDepot { } }; - void TBlobDepot::TBarrierServer::AddBarrierOnLoad(const TBlobDepot::TBarrier& barrier) { - Barriers[std::make_pair(barrier.TabletId, barrier.Channel)] = { - .SoftGenCtr = barrier.Soft.GenCtr, - .Soft = barrier.Soft.Collect, - .HardGenCtr = barrier.Hard.GenCtr, - .Hard = barrier.Hard.Collect, + void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, + TGenStep hardGenCtr, TGenStep hard) { + Barriers[std::make_tuple(tabletId, channel)] = { + .SoftGenCtr = softGenCtr, + .Soft = soft, + .HardGenCtr = hardGenCtr, + .Hard = hard, }; } - void TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TBlobDepot::TBarrier& barrier, + void TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); - const auto key = std::make_pair(barrier.TabletId, barrier.Channel); + const auto key = std::make_tuple(barrier.TabletId, barrier.Channel); auto& current = Barriers[key]; #define DO(TYPE) \ - if (current.TYPE##GenCtr < barrier.TYPE.GenCtr) { \ - if (current.TYPE <= barrier.TYPE.Collect) { \ - current.TYPE##GenCtr = barrier.TYPE.GenCtr; \ - current.TYPE = barrier.TYPE.Collect; \ - db.Table<Schema::Barriers>().Key(barrier.TabletId, barrier.Channel).Update( \ - NIceDb::TUpdate<Schema::Barriers::TYPE##GenCtr>(ui64(barrier.TYPE.GenCtr)), \ - NIceDb::TUpdate<Schema::Barriers::TYPE>(ui64(barrier.TYPE.Collect)) \ - ); \ - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replacing " #TYPE " barrier through decommission", \ - (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \ - (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \ - (Barrier, barrier)); \ - } else { \ - STLOG(PRI_ERROR, BLOB_DEPOT, BDT36, "decreasing " #TYPE " barrier through decommission", \ + { \ + const TGenStep barrierGenCtr(barrier.TYPE.RecordGeneration, barrier.TYPE.PerGenerationCounter); \ + const TGenStep barrierCollect(barrier.TYPE.CollectGeneration, barrier.TYPE.CollectStep); \ + if (current.TYPE##GenCtr < barrierGenCtr) { \ + if (current.TYPE <= barrierCollect) { \ + current.TYPE##GenCtr = barrierGenCtr; \ + current.TYPE = barrierCollect; \ + db.Table<Schema::Barriers>().Key(barrier.TabletId, barrier.Channel).Update( \ + NIceDb::TUpdate<Schema::Barriers::TYPE##GenCtr>(ui64(barrierGenCtr)), \ + NIceDb::TUpdate<Schema::Barriers::TYPE>(ui64(barrierCollect)) \ + ); \ + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replacing " #TYPE " barrier through decommission", \ + (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \ + (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \ + (Barrier, barrier)); \ + } else { \ + STLOG(PRI_ERROR, BLOB_DEPOT, BDT36, "decreasing " #TYPE " barrier through decommission", \ + (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \ + (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \ + (Barrier, barrier)); \ + } \ + } else if (current.TYPE##GenCtr == barrierGenCtr && current.TYPE != barrierCollect) { \ + STLOG(PRI_ERROR, BLOB_DEPOT, BDT43, "barrier value mismatch through decommission", \ (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \ (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \ (Barrier, barrier)); \ } \ - } else if (current.TYPE##GenCtr == barrier.TYPE.GenCtr && current.TYPE != barrier.TYPE.Collect) { \ - STLOG(PRI_ERROR, BLOB_DEPOT, BDT43, "barrier value mismatch through decommission", \ - (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \ - (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \ - (Barrier, barrier)); \ } DO(Hard) @@ -222,7 +227,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { const auto& record = ev->Get()->Record; - const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel()); auto& barrier = Barriers[key]; barrier.ProcessingQ.emplace_back(ev.Release()); if (Self->Data->IsLoaded() && barrier.ProcessingQ.size() == 1) { @@ -231,12 +236,12 @@ namespace NKikimr::NBlobDepot { } bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const { - const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel())); + const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel())); return it == Barriers.end() || TGenStep(id) > Max(it->second.Soft, it->second.Hard); } void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const { - const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel())); + const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel())); const TGenStep genStep(id); *underSoft = it == Barriers.end() ? false : genStep <= it->second.Soft; *underHard = it == Barriers.end() ? false : genStep <= it->second.Hard; @@ -245,7 +250,8 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::TBarrierServer::OnDataLoaded() { for (auto& [key, barrier] : Barriers) { if (!barrier.ProcessingQ.empty()) { - Self->Execute(std::make_unique<TTxCollectGarbage>(Self, key.first, key.second)); + const auto& [tabletId, channel] = key; + Self->Execute(std::make_unique<TTxCollectGarbage>(Self, tabletId, channel)); } } } diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h index ebaa1fb589..d12c4abb57 100644 --- a/ydb/core/blob_depot/garbage_collection.h +++ b/ydb/core/blob_depot/garbage_collection.h @@ -16,7 +16,7 @@ namespace NKikimr::NBlobDepot { std::deque<std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>> ProcessingQ; }; - THashMap<std::pair<ui64, ui8>, TBarrier> Barriers; + THashMap<std::tuple<ui64, ui8>, TBarrier> Barriers; private: class TTxCollectGarbage; @@ -26,8 +26,8 @@ namespace NKikimr::NBlobDepot { : Self(self) {} - void AddBarrierOnLoad(const TBlobDepot::TBarrier& barrier); - void AddBarrierOnDecommit(const TBlobDepot::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc); + void AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, TGenStep hardGenCtr, TGenStep hard); + void AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc); void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); bool CheckBlobForBarrier(TLogoBlobID id) const; void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const; @@ -36,7 +36,8 @@ namespace NKikimr::NBlobDepot { template<typename TCallback> void Enumerate(TCallback&& callback) { for (const auto& [key, value] : Barriers) { - callback(key.first, key.second, value.SoftGenCtr, value.Soft, value.HardGenCtr, value.Hard); + const auto& [tabletId, channel] = key; + callback(tabletId, channel, value.SoftGenCtr, value.Soft, value.HardGenCtr, value.Hard); } } }; diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index 10cd84db33..60701db398 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -46,7 +46,11 @@ namespace NKikimr::NBlobDepot { return false; } while (table.IsValid()) { - Self->BlocksManager->AddBlockOnLoad(TBlock::FromRow(table)); + Self->BlocksManager->AddBlockOnLoad( + table.GetValue<Schema::Blocks::TabletId>(), + table.GetValue<Schema::Blocks::BlockedGeneration>(), + table.GetValue<Schema::Blocks::IssuerGuid>() + ); if (!table.Next()) { return false; } @@ -60,7 +64,14 @@ namespace NKikimr::NBlobDepot { return false; } while (table.IsValid()) { - Self->BarrierServer->AddBarrierOnLoad(TBarrier::FromRow(table)); + Self->BarrierServer->AddBarrierOnLoad( + table.GetValue<Schema::Barriers::TabletId>(), + table.GetValue<Schema::Barriers::Channel>(), + TGenStep(table.GetValue<Schema::Barriers::SoftGenCtr>()), + TGenStep(table.GetValue<Schema::Barriers::Soft>()), + TGenStep(table.GetValue<Schema::Barriers::HardGenCtr>()), + TGenStep(table.GetValue<Schema::Barriers::Hard>()) + ); if (!table.Next()) { return false; } diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index fae3b54999..3eb87d2b05 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { struct TChannelKind { std::array<ui8, 256> ChannelToIndex; - std::vector<std::pair<ui8, ui32>> ChannelGroups; + std::vector<std::tuple<ui8, ui32>> ChannelGroups; }; #pragma pack(push, 1) diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp index e768d2d841..45f7d9166d 100644 --- a/ydb/core/blobstorage/backpressure/queue.cpp +++ b/ydb/core/blobstorage/backpressure/queue.cpp @@ -155,6 +155,7 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re TYPE_CASE(TEvBlobStorage::TEvVCollectGarbage) TYPE_CASE(TEvBlobStorage::TEvVGetBarrier) TYPE_CASE(TEvBlobStorage::TEvVStatus) + TYPE_CASE(TEvBlobStorage::TEvVAssimilate) #undef TYPE_CASE default: return Sprintf("0x%08" PRIx32, item.Event.GetType()); diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp index 43625bd5ff..1156f3180e 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp @@ -234,6 +234,7 @@ private: case TEvBlobStorage::EvVGetResult: case TEvBlobStorage::EvVStatusResult: + case TEvBlobStorage::EvVAssimilateResult: expected = InterconnectChannel; break; @@ -458,6 +459,7 @@ private: ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, false, false)); Queue.DrainQueue(status, TStringBuilder() << "BS_QUEUE: " << errorReason, ctx); DrainStatus(status, ctx); + DrainAssimilate(status, errorReason, ctx); break; } State = EState::INITIAL; @@ -668,6 +670,48 @@ private: ctx.Send(sender, response.release(), 0, cookie); } + THashMap<ui64, std::pair<TActorId, ui64>> AssimilateRequests; + ui64 NextAssimilateRequestCookie = 1; + + void Handle(TEvBlobStorage::TEvVAssimilate::TPtr& ev, const TActorContext& ctx) { + if (IsReady()) { + const ui64 id = NextAssimilateRequestCookie++; + ctx.Send(RemoteVDisk, ev->Release().Release(), IEventHandle::MakeFlags(InterconnectChannel, + IEventHandle::FlagTrackDelivery), id, std::move(ev->TraceId)); + AssimilateRequests.emplace(id, std::make_pair(ev->Sender, ev->Cookie)); + } else { + ctx.Send(ev->Sender, new TEvBlobStorage::TEvVAssimilateResult(NKikimrProto::NOTREADY, "no connection", + VDiskId), 0, ev->Cookie); + } + } + + void DrainAssimilate(NKikimrProto::EReplyStatus status, TString errorReason, const TActorContext& ctx) { + for (const auto& [id, ep] : std::exchange(AssimilateRequests, {})) { + const auto& [sender, cookie] = ep; + ctx.Send(sender, new TEvBlobStorage::TEvVAssimilateResult(status, errorReason, VDiskId), 0, cookie); + } + } + + void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr& ev, const TActorContext& ctx) { + if (!CheckReply(ev, ctx)) { + return; + } + if (const auto it = AssimilateRequests.find(ev->Cookie); it != AssimilateRequests.end()) { + const auto& [sender, cookie] = it->second; + ctx.Send(sender, ev->Release().Release(), 0, cookie, std::move(ev->TraceId)); + AssimilateRequests.erase(it); + } else { + const TString& message = TStringBuilder() << "unexpected TEvVAssimilateResult received" + << " Cookie# " << ev->Cookie + << " Sender# " << ev->Sender + << " Msg# " << ev->Get()->ToString() + << " VDiskId# " << VDiskId; + Y_VERIFY_DEBUG(false, "%s", message.data()); + QLOG_CRIT_S("BSQ39", message); + } + ResetWatchdogTimer(ctx.Now()); + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // REQUEST TIME TRACKING AND REPORTING SYSTEM ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////0 @@ -825,6 +869,8 @@ private: XX(TEvBlobStorage::EvVReadyNotify, EvVReadyNotify) \ XX(TEvBlobStorage::EvVStatus, EvVStatus) \ XX(TEvBlobStorage::EvVStatusResult, EvVStatusResult) \ + XX(TEvBlobStorage::EvVAssimilate, EvVAssimilate) \ + XX(TEvBlobStorage::EvVAssimilateResult, EvVAssimilateResult) \ XX(TEvBlobStorage::EvRequestProxyQueueState, EvRequestProxyQueueState) \ XX(TEvBlobStorage::EvVWindowChange, EvVWindowChange) \ XX(TEvInterconnect::EvNodeConnected, EvNodeConnected) \ @@ -896,6 +942,8 @@ private: HFunc(TEvBlobStorage::TEvVStatus, Handle) HFunc(TEvBlobStorage::TEvVStatusResult, Handle) + HFunc(TEvBlobStorage::TEvVAssimilate, Handle) + HFunc(TEvBlobStorage::TEvVAssimilateResult, Handle) HFunc(TEvRequestProxyQueueState, Handle) diff --git a/ydb/core/blobstorage/dsproxy/CMakeLists.txt b/ydb/core/blobstorage/dsproxy/CMakeLists.txt index ec25b93606..277b5f44e6 100644 --- a/ydb/core/blobstorage/dsproxy/CMakeLists.txt +++ b/ydb/core/blobstorage/dsproxy/CMakeLists.txt @@ -28,6 +28,7 @@ target_sources(core-blobstorage-dsproxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_stat.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp diff --git a/ydb/core/blobstorage/dsproxy/defs.h b/ydb/core/blobstorage/dsproxy/defs.h index f2c837b4d6..0daa207e3b 100644 --- a/ydb/core/blobstorage/dsproxy/defs.h +++ b/ydb/core/blobstorage/dsproxy/defs.h @@ -20,5 +20,6 @@ #include <util/generic/deque.h> #include <util/generic/hash_set.h> #include <util/generic/map.h> +#include <util/generic/overloaded.h> #include <util/string/escape.h> #include <library/cpp/monlib/service/pages/templates.h> diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 5f1fd27e53..80095da114 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -290,6 +290,7 @@ public: CHECK(TEvVCollectGarbageResult); CHECK(TEvVGetBarrierResult); CHECK(TEvVStatusResult); + CHECK(TEvVAssimilateResult); #undef CHECK case TEvBlobStorage::EvProxySessionsState: { @@ -331,7 +332,7 @@ public: template<typename T> void SendToQueue(std::unique_ptr<T> event, ui64 cookie, bool timeStatsEnabled = false) { - if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) { + if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus> && !std::is_same_v<T, TEvBlobStorage::TEvVAssimilate>) { event->MessageRelevanceTracker = MessageRelevanceTracker; } const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span.GetTraceId(), @@ -445,6 +446,7 @@ public: XX(CollectGarbage) XX(Status) XX(Patch) + XX(Assimilate) default: Y_FAIL(); #undef XX @@ -631,6 +633,11 @@ IActor* CreateBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGrou const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev, ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); +IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info, + const TIntrusivePtr<TGroupQueues>& state, const TActorId& source, + const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev, + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters); + IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon); IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp new file mode 100644 index 0000000000..d8fe73077d --- /dev/null +++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp @@ -0,0 +1,428 @@ +#include "dsproxy.h" + +namespace NKikimr { + +class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor<TBlobStorageGroupAssimilateRequest> { + std::optional<ui64> SkipBlocksUpTo; + std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo; + std::optional<TLogoBlobID> SkipBlobsUpTo; + + struct TBlock : TEvBlobStorage::TEvAssimilateResult::TBlock { + TBlock(const TBlock&) = default; + + TBlock(ui64 key, const NKikimrBlobStorage::TEvVAssimilateResult::TBlock& msg) { + Y_VERIFY_DEBUG(msg.HasBlockedGeneration()); + TabletId = key; + BlockedGeneration = msg.GetBlockedGeneration(); + } + + bool Merge(const TBlock& other) { + if (TabletId == other.TabletId) { + BlockedGeneration = Max(BlockedGeneration, other.BlockedGeneration); + return true; + } else { + return false; + } + } + + bool GoesBeforeThan(const TBlock& other) const { + return TabletId < other.TabletId; + } + }; + + struct TBarrier : TEvBlobStorage::TEvAssimilateResult::TBarrier { + TBarrier(const TBarrier&) = default; + + TBarrier(std::tuple<ui64, ui8> key, const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier& msg) { + std::tie(TabletId, Channel) = key; + + auto parseValue = [](auto& value, const auto& pb) { + Y_VERIFY_DEBUG(pb.HasRecordGeneration() && pb.HasPerGenerationCounter() && pb.HasCollectGeneration() && + pb.HasCollectStep()); + value = { + .RecordGeneration = pb.GetRecordGeneration(), + .PerGenerationCounter = pb.GetPerGenerationCounter(), + .CollectGeneration = pb.GetCollectGeneration(), + .CollectStep = pb.GetCollectStep(), + }; + }; + if (msg.HasSoft()) { + parseValue(Soft, msg.GetSoft()); + } + if (msg.HasHard()) { + parseValue(Hard, msg.GetHard()); + } + } + + bool Merge(const TBarrier& other) { + if (TabletId == other.TabletId && Channel == other.Channel) { + Merge(Soft, other.Soft); + Merge(Hard, other.Hard); + return true; + } else { + return false; + } + } + + static void Merge(TValue& to, const TValue& from) { + if (std::tie(to.RecordGeneration, to.PerGenerationCounter) < std::tie(from.RecordGeneration, from.PerGenerationCounter)) { + to = from; + } + } + + bool GoesBeforeThan(const TBarrier& other) const { + return std::tie(TabletId, Channel) < std::tie(other.TabletId, other.Channel); + } + }; + + struct TBlob : TEvBlobStorage::TEvAssimilateResult::TBlob { + ui64 Ingress; + + TBlob(const TBlob&) = default; + + TBlob(TLogoBlobID key, const NKikimrBlobStorage::TEvVAssimilateResult::TBlob& msg) { + Y_VERIFY_DEBUG(msg.HasIngress()); + Id = key; + Ingress = msg.GetIngress(); + Keep = DoNotKeep = false; + } + + bool Merge(const TBlob& other) { + if (Id == other.Id) { + Ingress |= other.Ingress; + return true; + } else { + return false; + } + } + + bool GoesBeforeThan(const TBlob& other) const { + return Id < other.Id; + } + }; + + using TItemVariant = std::variant<TBlock, TBarrier, TBlob>; + + struct TPerVDiskInfo { + std::optional<ui64> LastProcessedBlock; + std::optional<std::tuple<ui64, ui8>> LastProcessedBarrier; + std::optional<TLogoBlobID> LastProcessedBlob; + + std::deque<TBlock> Blocks; + std::deque<TBarrier> Barriers; + std::deque<TBlob> Blobs; + + struct TFinished { + friend bool operator <(const TFinished&, const TFinished&) { return false; } + }; + + std::variant<TBlock*, TBarrier*, TBlob*, TFinished> Next; + + void PushDataFromMessage(const NKikimrBlobStorage::TEvVAssimilateResult& msg, + const TBlobStorageGroupAssimilateRequest& self) { + Y_VERIFY(Blocks.empty() && Barriers.empty() && Blobs.empty()); + + std::array<ui64, 3> context = {0, 0, 0}; + + auto getKey = [&](const auto& item) { + using T = std::decay_t<decltype(item)>; + if constexpr (std::is_same_v<T, NKikimrBlobStorage::TEvVAssimilateResult::TBlock>) { + Y_VERIFY_DEBUG(item.HasTabletId()); + return ui64(item.GetTabletId()); + } else if constexpr (std::is_same_v<T, NKikimrBlobStorage::TEvVAssimilateResult::TBarrier>) { + Y_VERIFY_DEBUG(item.HasTabletId() && item.HasChannel()); + return std::tuple<ui64, ui8>(item.GetTabletId(), item.GetChannel()); + } else if constexpr (std::is_same_v<T, NKikimrBlobStorage::TEvVAssimilateResult::TBlob>) { + if (item.HasRawX1()) { + context[0] = item.GetRawX1(); + } else if (item.HasDiffX1()) { + context[0] += item.GetDiffX1(); + } + if (item.HasRawX2()) { + context[1] = item.GetRawX2(); + } else if (item.HasDiffX2()) { + context[1] += item.GetDiffX2(); + } + if (item.HasRawX3()) { + context[2] = item.GetRawX3(); + } else if (item.HasDiffX3()) { + context[2] += item.GetDiffX3(); + } + return TLogoBlobID(context.data()); + } else { + static_assert(TDependentFalse<T>, "unsupported protobuf"); + } + }; + + auto processItems = [&](const auto& items, auto& lastProcessed, auto& field, const auto& skipUpTo) { + for (const auto& item : items) { + const auto key = getKey(item); + Y_VERIFY(lastProcessed < key); + lastProcessed.emplace(key); + if (skipUpTo < key) { + field.emplace_back(key, item); + } + } + }; + + processItems(msg.GetBlocks(), LastProcessedBlock, Blocks, self.SkipBlocksUpTo); + processItems(msg.GetBarriers(), LastProcessedBarrier, Barriers, self.SkipBarriersUpTo); + processItems(msg.GetBlobs(), LastProcessedBlob, Blobs, self.SkipBlobsUpTo); + + AdjustNext(); + } + + void Consume() { + if (!Blocks.empty()) { + Blocks.pop_front(); + } else if (!Barriers.empty()) { + Barriers.pop_front(); + } else if (!Blobs.empty()) { + Blobs.pop_front(); + } else { + Y_UNREACHABLE(); + } + } + + void AdjustNext() { + if (!Blocks.empty()) { + Next.emplace<0>(&Blocks.front()); + } else if (!Barriers.empty()) { + Next.emplace<1>(&Barriers.front()); + } else if (!Blobs.empty()) { + Next.emplace<2>(&Blobs.front()); + } else { + Next.emplace<3>(); + } + } + + TItemVariant BeginMerge() const { + return std::visit([](auto value) -> TItemVariant { + if constexpr (std::is_same_v<decltype(value), TFinished>) { + Y_FAIL(); + } else { + return TItemVariant(std::in_place_type<std::decay_t<decltype(*value)>>, *value); + } + }, Next); + } + + bool Merge(TItemVariant *to) const { + return std::visit([to](auto value) -> bool { + if constexpr (std::is_same_v<decltype(value), TFinished>) { + Y_FAIL(); + } else if (auto *toItem = std::get_if<std::decay_t<decltype(*value)>>(to)) { + return toItem->Merge(*value); + } else { + return false; + } + }, Next); + } + + bool GoesBeforeThan(const TPerVDiskInfo& other) const { + return Next.index() != other.Next.index() + ? Next.index() < other.Next.index() + : std::visit([&other](auto&& left) { + using T = std::decay_t<decltype(left)>; + const auto& right = std::get<T>(other.Next); + if constexpr (std::is_same_v<T, TFinished>) { + return false; // always equal + } else { + return left->GoesBeforeThan(*right); + } + }, Next); + } + + bool Finished() const { + return Next.index() == 3; + } + + bool HasItemsToMerge() const { + return !Blocks.empty() || !Barriers.empty() || !Blobs.empty(); + } + + struct TCompare { + bool operator ()(const TPerVDiskInfo *x, const TPerVDiskInfo *y) const { + return y->GoesBeforeThan(*x); + } + }; + }; + + std::vector<TPerVDiskInfo> PerVDiskInfo; + std::vector<TPerVDiskInfo*> Heap; + + std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> Result; + ui32 RequestsInFlight = 0; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType(){ + return NKikimrServices::TActivity::BS_GROUP_ASSIMILATE; + } + + static const auto& ActiveCounter(const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon) { + return mon->ActiveAssimilate; + } + + TBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info, + const TIntrusivePtr<TGroupQueues>& state, const TActorId& source, + const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev, ui64 cookie, + NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters) + : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), + NKikimrServices::BS_PROXY_ASSIMILATE, false, {}, now, storagePoolCounters, ev->RestartCounter, + "DSProxy.Assimilate") + , SkipBlocksUpTo(ev->SkipBlocksUpTo) + , SkipBarriersUpTo(ev->SkipBarriersUpTo) + , SkipBlobsUpTo(ev->SkipBlobsUpTo) + , PerVDiskInfo(info->GetTotalVDisksNum()) + , Result(new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::OK, {})) + { + Heap.reserve(PerVDiskInfo.size()); + } + + void Bootstrap() { + Become(&TThis::StateWork); + + for (ui32 i = 0; i < PerVDiskInfo.size(); ++i) { + Request(i); + } + } + + STATEFN(StateWork) { + if (ProcessEvent(ev)) { + return; + } + switch (ev->GetTypeRewrite()) { + hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle); + } + } + + void Request(ui32 orderNumber) { + TPerVDiskInfo& info = PerVDiskInfo[orderNumber]; + Y_VERIFY(!info.Finished()); + + auto maxOpt = [](const auto& x, const auto& y) { return !x ? y : !y ? x : *x < *y ? y : x; }; + + SendToQueue(std::make_unique<TEvBlobStorage::TEvVAssimilate>(Info->GetVDiskId(orderNumber), + maxOpt(SkipBlocksUpTo, info.LastProcessedBlock), + maxOpt(SkipBarriersUpTo, info.LastProcessedBarrier), + maxOpt(SkipBlobsUpTo, info.LastProcessedBlob)), 0); + + ++RequestsInFlight; + } + + void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) { + --RequestsInFlight; + + const auto& record = ev->Get()->Record; + const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); + const ui32 orderNumber = Info->GetOrderNumber(vdiskId); + Y_VERIFY(orderNumber < PerVDiskInfo.size()); + + if (record.GetStatus() == NKikimrProto::OK) { + auto& info = PerVDiskInfo[orderNumber]; + info.PushDataFromMessage(record, *this); + if (info.HasItemsToMerge()) { + Heap.push_back(&info); + std::push_heap(Heap.begin(), Heap.end(), TPerVDiskInfo::TCompare()); + } else if (!info.Finished()) { + Request(orderNumber); + } + } + + if (!RequestsInFlight) { + Merge(); + } + } + + void Merge() { + std::vector<ui32> requests; + + const TBlobStorageGroupInfo::TTopology *top = &Info->GetTopology(); + TBlobStorageGroupInfo::TGroupVDisks disksWithData(top); + for (ui32 i = 0; i < PerVDiskInfo.size(); ++i) { + TPerVDiskInfo& info = PerVDiskInfo[i]; + if (info.HasItemsToMerge() || info.Finished()) { + disksWithData += {top, top->GetVDiskId(i)}; + } + } + if (!Info->GetQuorumChecker().CheckQuorumForGroup(disksWithData)) { + if (Result->Blocks.empty() && Result->Barriers.empty() && Result->Blobs.empty()) { + // we didn't get any data, so reply with ERROR to prevent confusing blob depot with 'all finished' situation + ReplyAndDie(NKikimrProto::ERROR); + } else { + // answer with what we have already collected + SendResponseAndDie(std::move(Result)); + } + return; + } + while (requests.empty()) { + if (Heap.empty()) { + SendResponseAndDie(std::move(Result)); + return; + } + + TPerVDiskInfo *heapItem = Heap.front(); + auto item = heapItem->BeginMerge(); + + for (;;) { + std::pop_heap(Heap.begin(), Heap.end(), TPerVDiskInfo::TCompare()); + heapItem->Consume(); + if (heapItem->HasItemsToMerge()) { + heapItem->AdjustNext(); + std::push_heap(Heap.begin(), Heap.end(), TPerVDiskInfo::TCompare()); + } else { + Heap.pop_back(); + const ui32 orderNumber = heapItem - PerVDiskInfo.data(); + requests.push_back(orderNumber); + } + + heapItem = Heap.empty() ? nullptr : Heap.front(); + if (!heapItem || !heapItem->Merge(&item)) { + break; + } + } + + std::visit(TOverloaded{ + [&](TBlock& block) { + SkipBlocksUpTo.emplace(block.TabletId); + Result->Blocks.push_back(block); + }, + [&](TBarrier& barrier) { + SkipBarriersUpTo.emplace(barrier.TabletId, barrier.Channel); + Result->Barriers.push_back(barrier); + }, + [&](TBlob& blob) { + SkipBlobsUpTo.emplace(blob.Id); + Result->Blobs.push_back(blob); + } + }, item); + } + + if (Result->Blocks.size() + Result->Barriers.size() + Result->Blobs.size() >= 10'000) { + SendResponseAndDie(std::move(Result)); + } else { + for (const ui32 orderNumber : requests) { + Request(orderNumber); + } + } + } + + std::unique_ptr<IEventBase> RestartQuery(ui32 counter) { + ++*Mon->NodeMon->RestartAssimilate; + auto ev = std::make_unique<TEvBlobStorage::TEvAssimilate>(SkipBlocksUpTo, SkipBarriersUpTo, SkipBlobsUpTo); + ev->RestartCounter = counter; + return ev; + } + + void ReplyAndDie(NKikimrProto::EReplyStatus status) { + SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, ErrorReason)); + } +}; + +IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info, + const TIntrusivePtr<TGroupQueues>& state, const TActorId& source, + const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev, + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters) { + return new TBlobStorageGroupAssimilateRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters); +} + +} // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index e03405a42c..47d60411f3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -143,6 +143,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> Mon->EventStatus->Inc(); } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvPatch>) { Mon->EventPatch->Inc(); + } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvAssimilate>) { + Mon->EventAssimilate->Inc(); } } @@ -255,6 +257,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> void HandleNormal(TEvBlobStorage::TEvRange::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvCollectGarbage::TPtr &ev); void HandleNormal(TEvBlobStorage::TEvStatus::TPtr &ev); + void HandleNormal(TEvBlobStorage::TEvAssimilate::TPtr &ev); void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); void Handle(TEvDeathNote::TPtr ev); @@ -362,6 +365,7 @@ public: hFunc(TEvBlobStorage::TEvCollectGarbage, HANDLER); \ hFunc(TEvBlobStorage::TEvStatus, HANDLER); \ hFunc(TEvBlobStorage::TEvPatch, HANDLER); \ + hFunc(TEvBlobStorage::TEvAssimilate, HANDLER); \ /**/ STFUNC(StateUnconfigured) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp index 30440593be..6b617a6b0f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp @@ -47,6 +47,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni EventStopPutBatching = EventGroup->GetCounter("EvStopPutBatching", true); EventStopGetBatching = EventGroup->GetCounter("EvStopGetBatching", true); EventPatch = EventGroup->GetCounter("EvPatch", true); + EventAssimilate = EventGroup->GetCounter("EvAssimilate", true); PutsSentViaPutBatching = EventGroup->GetCounter("PutsSentViaPutBatching", true); PutBatchesSent = EventGroup->GetCounter("PutBatchesSent", true); @@ -72,6 +73,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni ActiveCollectGarbage = ActiveRequestsGroup->GetCounter("ActiveCollectGarbage"); ActiveStatus = ActiveRequestsGroup->GetCounter("ActiveStatus"); ActivePatch = ActiveRequestsGroup->GetCounter("ActivePatch"); + ActiveAssimilate = ActiveRequestsGroup->GetCounter("ActiveAssimilate"); // special patch counters VPatchContinueFailed = ActiveRequestsGroup->GetCounter("VPatchContinueFailed"); @@ -101,6 +103,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni RespStatCollectGarbage.emplace(respStatGroup->GetSubgroup("request", "collectGarbage")); RespStatStatus.emplace(respStatGroup->GetSubgroup("request", "status")); RespStatPatch.emplace(respStatGroup->GetSubgroup("request", "patch")); + RespStatAssimilate.emplace(respStatGroup->GetSubgroup("request", "assimilate")); } void TBlobStorageGroupProxyMon::BecomeFull() { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h index 59e3007572..5d1a9aa84c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h @@ -212,6 +212,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr EventStopPutBatching; ::NMonitoring::TDynamicCounters::TCounterPtr EventStopGetBatching; ::NMonitoring::TDynamicCounters::TCounterPtr EventPatch; + ::NMonitoring::TDynamicCounters::TCounterPtr EventAssimilate; ::NMonitoring::TDynamicCounters::TCounterPtr PutsSentViaPutBatching; ::NMonitoring::TDynamicCounters::TCounterPtr PutBatchesSent; @@ -231,6 +232,7 @@ public: ::NMonitoring::TDynamicCounters::TCounterPtr ActiveMultiCollect; ::NMonitoring::TDynamicCounters::TCounterPtr ActiveStatus; ::NMonitoring::TDynamicCounters::TCounterPtr ActivePatch; + ::NMonitoring::TDynamicCounters::TCounterPtr ActiveAssimilate; std::optional<TResponseStatusGroup> RespStatPut; std::optional<TResponseStatusGroup> RespStatGet; @@ -240,6 +242,7 @@ public: std::optional<TResponseStatusGroup> RespStatCollectGarbage; std::optional<TResponseStatusGroup> RespStatStatus; std::optional<TResponseStatusGroup> RespStatPatch; + std::optional<TResponseStatusGroup> RespStatAssimilate; // special patch counters ::NMonitoring::TDynamicCounters::TCounterPtr VPatchContinueFailed; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp index 527a10ae86..c0930e7a0e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp @@ -58,6 +58,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters> RestartCollectGarbage = group->GetCounter("EvCollectGarbage", true); RestartIndexRestoreGet = group->GetCounter("EvIndexRestoreGet", true); RestartStatus = group->GetCounter("EvStatus", true); + RestartAssimilate = group->GetCounter("EvAssimilate", true); } { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h index c5fab00d3d..2681b86681 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h @@ -75,6 +75,7 @@ struct TDsProxyNodeMon : public TThrRefBase { ::NMonitoring::TDynamicCounters::TCounterPtr RestartIndexRestoreGet; ::NMonitoring::TDynamicCounters::TCounterPtr RestartStatus; ::NMonitoring::TDynamicCounters::TCounterPtr RestartPatch; + ::NMonitoring::TDynamicCounters::TCounterPtr RestartAssimilate; std::array<::NMonitoring::TDynamicCounters::TCounterPtr, 4> RestartHisto; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 72a6b09104..87b4d72b5d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -231,6 +231,14 @@ namespace NKikimr { ActiveRequests.insert(reqID); } + void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvAssimilate::TPtr &ev) { + EnsureMonitoring(true); + Mon->EventAssimilate->Inc(); + const TActorId reqID = Register(CreateBlobStorageGroupAssimilateRequest(Info, Sessions->GroupQueues, ev->Sender, + Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), StoragePoolCounters)); + ActiveRequests.insert(reqID); + } + void TBlobStorageGroupProxy::Handle(TEvDeathNote::TPtr ev) { const bool wasEmpty = ResponsivenessTracker.IsEmpty(); for (const auto &item : ev->Get()->Responsiveness) { diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.h b/ydb/core/blobstorage/dsproxy/group_sessions.h index b8dc8c69da..b5f33d2f4e 100644 --- a/ydb/core/blobstorage/dsproxy/group_sessions.h +++ b/ydb/core/blobstorage/dsproxy/group_sessions.h @@ -53,6 +53,10 @@ namespace NKikimr { return NKikimrBlobStorage::EVDiskQueueId::GetFastRead; } + static NKikimrBlobStorage::EVDiskQueueId VDiskQueueId(const TEvBlobStorage::TEvVAssimilate&) { + return NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead; + } + TQueue& GetQueue(NKikimrBlobStorage::EVDiskQueueId queueId) { switch (queueId) { case NKikimrBlobStorage::EVDiskQueueId::PutTabletLog: return PutTabletLog; @@ -179,6 +183,7 @@ namespace NKikimr { } void SetUpSubmitTimestamp(TEvBlobStorage::TEvVStatus& /*event*/) {} + void SetUpSubmitTimestamp(TEvBlobStorage::TEvVAssimilate& /*event*/) {} template<typename TEvent> TActorId Send(const IActor& actor, const TBlobStorageGroupInfo::TTopology& topology, std::unique_ptr<TEvent> event, diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp index c0e6d8f15c..b9404a73b1 100644 --- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp +++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp @@ -49,6 +49,11 @@ namespace NKikimr { ev->Cookie); } + void Handle(TEvBlobStorage::TEvAssimilate::TPtr& ev) { + STLOG(PRI_DEBUG, BS_PROXY, BSPM09, "TEvAssimilate", (Msg, ev->Get()->ToString())); + Send(ev->Sender, new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::ERROR, "not implemented")); + } + void HandlePoison(TEvents::TEvPoisonPill::TPtr& ev) { STLOG(PRI_DEBUG, BS_PROXY, BSPM08, "TEvPoisonPill"); Send(ev->Sender, new TEvents::TEvPoisonTaken); diff --git a/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp b/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp index 20d2058c30..29a83a61c6 100644 --- a/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp @@ -92,21 +92,8 @@ Y_UNIT_TEST_SUITE(VDiskAssimilation) { for (;;) { const TActorId vdiskId = info->GetActorId(i); const TActorId client = runtime->AllocateEdgeActor(vdiskId.NodeId(), __FILE__, __LINE__); - auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(info->GetVDiskId(i)); - { - auto& record = ev->Record; - if (lastBlock) { - record.SetSkipBlocksUpTo(*lastBlock); - } - if (lastBarrier) { - auto *x = record.MutableSkipBarriersUpTo(); - x->SetTabletId(lastBarrier->first); - x->SetChannel(lastBarrier->second); - } - if (lastBlob) { - LogoBlobIDFromLogoBlobID(*lastBlob, record.MutableSkipBlobsUpTo()); - } - } + auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(info->GetVDiskId(i), lastBlock, lastBarrier, + lastBlob); runtime->Send(new IEventHandle(vdiskId, client, ev.release()), vdiskId.NodeId()); auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVAssimilateResult>(client); const auto& record = res->Get()->Record; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 305e4ad722..3393f2ac30 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -2512,8 +2512,20 @@ namespace NKikimr { struct TEvBlobStorage::TEvVAssimilate : TEventPB<TEvVAssimilate, NKikimrBlobStorage::TEvVAssimilate, EvVAssimilate> { TEvVAssimilate() = default; - TEvVAssimilate(const TVDiskID& vdiskId) { + TEvVAssimilate(const TVDiskID& vdiskId, std::optional<ui64> skipBlocksUpTo, + std::optional<std::tuple<ui64, ui8>> skipBarriersUpTo, std::optional<TLogoBlobID> skipBlobsUpTo) { VDiskIDFromVDiskID(vdiskId, Record.MutableVDiskID()); + if (skipBlocksUpTo) { + Record.SetSkipBlocksUpTo(*skipBlocksUpTo); + } + if (skipBarriersUpTo) { + auto *barrier = Record.MutableSkipBarriersUpTo(); + barrier->SetTabletId(std::get<0>(*skipBarriersUpTo)); + barrier->SetChannel(std::get<1>(*skipBarriersUpTo)); + } + if (skipBlobsUpTo) { + LogoBlobIDFromLogoBlobID(*skipBlobsUpTo, Record.MutableSkipBlobsUpTo()); + } } }; @@ -2521,11 +2533,12 @@ namespace NKikimr { { TEvVAssimilateResult() = default; - TEvVAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason) { + TEvVAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason, TVDiskID vdiskId) { Record.SetStatus(status); if (status != NKikimrProto::OK) { Record.SetErrorReason(errorReason); } + VDiskIDFromVDiskID(vdiskId, Record.MutableVDiskID()); } }; diff --git a/ydb/core/blobstorage/vdisk/query/assimilation.cpp b/ydb/core/blobstorage/vdisk/query/assimilation.cpp index 6038d4edcf..07ef96c24d 100644 --- a/ydb/core/blobstorage/vdisk/query/assimilation.cpp +++ b/ydb/core/blobstorage/vdisk/query/assimilation.cpp @@ -15,16 +15,16 @@ namespace NKikimr { std::optional<TKeyBarrier> SkipBarriersUpTo; std::optional<TKeyLogoBlob> SkipBlobsUpTo; ui64 LastRawBlobId[3] = {0, 0, 0}; - size_t RecordSize = CountUnsigned(NKikimrBlobStorage::TEvVAssimilateResult::kStatusFieldNumber, NKikimrProto::OK); + size_t RecordSize; static constexpr TDuration MaxQuantumTime = TDuration::MilliSeconds(10); static constexpr size_t MaxResultSize = 5'000'000; // may overshoot a little public: - TAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev) + TAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev, TVDiskID vdiskId) : Snap(std::move(snap)) , Ev(ev.Release()) - , Result(std::make_unique<TEvBlobStorage::TEvVAssimilateResult>(NKikimrProto::OK, TString())) + , Result(std::make_unique<TEvBlobStorage::TEvVAssimilateResult>(NKikimrProto::OK, TString(), vdiskId)) { const auto& record = Ev->Get()->Record; if (record.HasSkipBlocksUpTo()) { @@ -37,6 +37,8 @@ namespace NKikimr { if (record.HasSkipBlobsUpTo()) { SkipBlobsUpTo.emplace(LogoBlobIDFromLogoBlobID(record.GetSkipBlobsUpTo())); } + + RecordSize = Result->Record.ByteSizeLong(); } void Bootstrap(const TActorId& parentId) { @@ -257,8 +259,8 @@ namespace NKikimr { } }; - IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev) { - return new TAssimilationActor(std::move(snap), ev); + IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev, TVDiskID vdiskId) { + return new TAssimilationActor(std::move(snap), ev, vdiskId); } } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/query/assimilation.h b/ydb/core/blobstorage/vdisk/query/assimilation.h index 8d838cb750..7cda79fe39 100644 --- a/ydb/core/blobstorage/vdisk/query/assimilation.h +++ b/ydb/core/blobstorage/vdisk/query/assimilation.h @@ -7,6 +7,6 @@ namespace NKikimr { - IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev); + IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev, TVDiskID vdiskId); } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index aae8764608..64df734d78 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -1215,9 +1215,21 @@ namespace NKikimr { // ASSIMILATION //////////////////////////////////////////////////////////////////////// + void ReplyError(NKikimrProto::EReplyStatus status, const TString& errorReason, TEvBlobStorage::TEvVAssimilate::TPtr &ev, + const TActorContext &ctx, const TInstant &now) { + using namespace NErrBuilder; + std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, + SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); + } + void Handle(TEvBlobStorage::TEvVAssimilate::TPtr& ev, const TActorContext& ctx) { - const TActorId actorId = RunInBatchPool(ctx, CreateAssimilationActor(Hull->GetIndexSnapshot(), ev)); - ActiveActors.insert(actorId); + if (!SelfVDiskId.SameDisk(ev->Get()->Record.GetVDiskID())) { + ReplyError(NKikimrProto::RACE, "group generation mismatch", ev, ctx, TAppData::TimeProvider->Now()); + } else { + const TActorId actorId = RunInBatchPool(ctx, CreateAssimilationActor(Hull->GetIndexSnapshot(), ev, SelfVDiskId)); + ActiveActors.insert(actorId); + } } //////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h index f209e77fca..f014441a74 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h @@ -439,6 +439,15 @@ namespace NKikimr { return std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(status, vdiskID, now, counterPtr, nullptr, ev->GetChannel()); } + + static inline std::unique_ptr<IEventBase> + ErroneousResult(const TVDiskContextPtr& /*vctx*/, const NKikimrProto::EReplyStatus status, const TString& errorReason, + TEvBlobStorage::TEvVAssimilate::TPtr& /*ev*/, const TInstant& /*now*/, + const TActorIDPtr &/*skeletonFrontIDPtr*/, const TVDiskID &vdiskID, ui64 /*vdiskIncarnationGuid*/, + const TIntrusivePtr<TBlobStorageGroupInfo>& /*groupInfo*/) + { + return std::make_unique<TEvBlobStorage::TEvVAssimilateResult>(status, errorReason, vdiskID); + } } // NErrBuilder } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index fa9fcbd38b..a50f694cef 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -1342,12 +1342,6 @@ namespace NKikimr { ctx.Send(ev->Sender, new TEvBlobStorage::TEvVStatusResult(status, ev->Get()->Record.GetVDiskID()), flags, ev->Cookie); } - - void Reply(TEvBlobStorage::TEvVAssimilate::TPtr& ev, const TActorContext& ctx, - NKikimrProto::EReplyStatus status, const TString& errorReason, TInstant /*now*/) { - const ui32 flags = IEventHandle::MakeFlags(ev->GetChannel(), 0); - ctx.Send(ev->Sender, new TEvBlobStorage::TEvVAssimilateResult(status, errorReason), flags, ev->Cookie); - } // FIXME: don't forget about counters diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 2d498cd2af..d0009a446e 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -616,6 +616,7 @@ message TEvVAssimilateResult { optional NKikimrProto.EReplyStatus Status = 1; optional string ErrorReason = 2; + optional NKikimrBlobStorage.TVDiskID VDiskID = 3; repeated TBlock Blocks = 10; repeated TBarrier Barriers = 11; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 8f27f228f1..a04b2f2289 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -50,6 +50,7 @@ enum EServiceKikimr { BS_VDISK_SCRUB = 344; BS_VDISK_PATCH = 345; BS_VDISK_DEFRAG = 346; + BS_PROXY_ASSIMILATE = 347; // DATASHARD section // TX_DATASHARD = 290; // @@ -903,5 +904,6 @@ message TActivity { BS_STORAGE_STATS_ACTOR = 573; DS_LOAD_ACTOR = 574; PQ_META_CACHE = 575; + BS_GROUP_ASSIMILATE = 576; }; }; |