diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2025-07-02 12:17:53 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-02 09:17:53 +0000 |
commit | f0176285cf0af5c49d3d5ea352d57352bfa02751 (patch) | |
tree | 651532ff02366d07c4cbd803741027240d3b7973 | |
parent | bf7fd3e4df3d40b0355f29894aa78709f084c749 (diff) | |
download | ydb-f0176285cf0af5c49d3d5ea352d57352bfa02751.tar.gz |
Introduce BlobStorage syncer machinery for Bridge mode (#20423)
31 files changed, 991 insertions, 31 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 9ab3bb4ce8a..bda07034c5c 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -928,6 +928,8 @@ struct TEvBlobStorage { EvNodeWardenUnsubscribeFromCache, EvNodeWardenNotifyConfigMismatch, EvNodeWardenUpdateConfigFromPeer, + EvNodeWardenManageSyncers, + EvNodeWardenManageSyncersResult, // Other EvRunActor = EvPut + 15 * 512, @@ -2570,6 +2572,10 @@ struct TEvBlobStorage { void Output(IOutputStream& s) const { s << "{" << TabletId << "=>" << BlockedGeneration << "}"; } + + auto GetKey() const { + return std::tie(TabletId); + } }; struct TBarrier { @@ -2605,6 +2611,10 @@ struct TEvBlobStorage { Hard.Output(s); s << "}"; } + + auto GetKey() const { + return std::tie(TabletId, Channel); + } }; struct TBlob { @@ -2627,6 +2637,10 @@ struct TEvBlobStorage { s << "d"; } } + + auto GetKey() const { + return std::tie(Id); + } }; NKikimrProto::EReplyStatus Status; diff --git a/ydb/core/base/id_wrapper.h b/ydb/core/base/id_wrapper.h index de3f4c8842f..9f77b93fdba 100644 --- a/ydb/core/base/id_wrapper.h +++ b/ydb/core/base/id_wrapper.h @@ -78,6 +78,9 @@ public: T GetRawId() const { return Raw; } + static const TIdWrapper Min() { return FromValue(::Min<T>()); } + static const TIdWrapper Max() { return FromValue(::Max<T>()); } + friend std::hash<TIdWrapper<T, Tag>>; friend THash<TIdWrapper<T, Tag>>; diff --git a/ydb/core/blobstorage/bridge/defs.h b/ydb/core/blobstorage/bridge/defs.h new file mode 100644 index 00000000000..d067528eb9e --- /dev/null +++ b/ydb/core/blobstorage/bridge/defs.h @@ -0,0 +1,3 @@ +#pragma once + +#include <ydb/core/blobstorage/defs.h> diff --git a/ydb/core/blobstorage/bridge/syncer/defs.h b/ydb/core/blobstorage/bridge/syncer/defs.h new file mode 100644 index 00000000000..735ee3e1ae5 --- /dev/null +++ b/ydb/core/blobstorage/bridge/syncer/defs.h @@ -0,0 +1,3 @@ +#pragma once + +#include <ydb/core/blobstorage/bridge/defs.h> diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.cpp b/ydb/core/blobstorage/bridge/syncer/syncer.cpp new file mode 100644 index 00000000000..9481928f4b3 --- /dev/null +++ b/ydb/core/blobstorage/bridge/syncer/syncer.cpp @@ -0,0 +1,227 @@ +#include "syncer.h" +#include "syncer_impl.h" +#include <ydb/core/util/stlog.h> + +namespace NKikimr::NStorage::NBridge { + + TSyncerActor::TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId) + : Info(std::move(info)) + , TargetPileId(targetPileId) + { + Y_ABORT_UNLESS(!Info || Info->IsBridged()); + } + + void TSyncerActor::Bootstrap() { + LogId = TStringBuilder() << SelfId(); + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS00, "bootstrapping bridged blobstorage syncer", (LogId, LogId), + (TargetPileId, TargetPileId)); + Become(&TThis::StateFunc); + if (Info) { + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true)); + } + } + + void TSyncerActor::PassAway() { + TActorBootstrapped::PassAway(); + } + + void TSyncerActor::Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) { + if (!Info) { + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true)); + } + Info = std::move(ev->Get()->Info); + Y_ABORT_UNLESS(Info); + Y_ABORT_UNLESS(Info->IsBridged()); + } + + void TSyncerActor::Terminate(std::optional<TString> errorReason) { + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS04, "syncing finished", (LogId, LogId), (ErrorReason, errorReason)); + PassAway(); + } + + void TSyncerActor::Handle(TEvNodeWardenStorageConfig::TPtr ev) { + const bool initial = !BridgeInfo; + BridgeInfo = ev->Get()->BridgeInfo; + Y_ABORT_UNLESS(BridgeInfo); + const auto& targetPile = *BridgeInfo->GetPile(TargetPileId); + if (targetPile.State != NKikimrBridge::TClusterState::NOT_SYNCHRONIZED) { + // there is absolutely no need in synchronization: pile is either marked synchronized (what would be strange), + // or it is disconnected; we terminate in either case + return Terminate("target pile is not NOT_SYNCHRONIZED anymore"); + } + if (!initial && BridgeInfo->GetPile(SourcePileId)->State != NKikimrBridge::TClusterState::SYNCHRONIZED) { + return Terminate("source pile is not SYNCHRONIZED anymore"); + } + if (initial) { + InitiateSync(); + } + } + + void TSyncerActor::InitiateSync() { + // remember our source pile (primary one) on first start; actually, we can pick any of SYNCHRONIZED + Y_ABORT_UNLESS(BridgeInfo->PrimaryPile->State == NKikimrBridge::TClusterState::SYNCHRONIZED); + SourcePileId = BridgeInfo->PrimaryPile->BridgePileId; + const auto& groups = Info->GetBridgeGroupIds(); + Y_ABORT_UNLESS(groups.size() == std::size(BridgeInfo->Piles)); + SourceGroupId = groups[SourcePileId.GetRawId()]; + TargetGroupId = groups[TargetPileId.GetRawId()]; + LogId = TStringBuilder() << LogId << '{' << SourceGroupId << "->" << TargetGroupId << '}'; + SourceState = &GroupAssimilateState[SourceGroupId]; + TargetState = &GroupAssimilateState[TargetGroupId]; + IssueAssimilateRequest(SourceGroupId); + IssueAssimilateRequest(TargetGroupId); + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS01, "initiating sync", (LogId, LogId), (SourceGroupId, SourceGroupId), + (TargetGroupId, TargetGroupId)); + } + + void TSyncerActor::DoMergeLoop() { + if (SourceState->RequestInFlight || TargetState->RequestInFlight) { + return; // nothing to merge yet + } + + auto mergeBlocks = [&](auto *sourceItem, auto *targetItem) { + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS05, "merging block", (LogId, LogId), (SourceItem, sourceItem), + (TargetItem, targetItem)); + }; + + auto mergeBarriers = [&](auto *sourceItem, auto *targetItem) { + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS06, "merging barrier", (LogId, LogId), (SourceItem, sourceItem), + (TargetItem, targetItem)); + }; + + auto mergeBlobs = [&](auto *sourceItem, auto *targetItem) { + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS03, "merging blob", (LogId, LogId), (SourceItem, sourceItem), + (TargetItem, targetItem)); + }; + +#define MERGE(NAME) \ + if (!DoMergeEntities(SourceState->NAME, TargetState->NAME, SourceState->NAME##Finished, TargetState->NAME##Finished, merge##NAME)) { \ + return; \ + } + MERGE(Blocks) + MERGE(Barriers) + MERGE(Blobs) +#undef MERGE + } + + template<typename T, typename TCallback> + bool TSyncerActor::DoMergeEntities(std::deque<T>& source, std::deque<T>& target, bool sourceFinished, bool targetFinished, + TCallback&& merge) { + while ((!source.empty() || sourceFinished) && (!target.empty() || targetFinished)) { + auto *sourceItem = source.empty() ? nullptr : &source.front(); + auto *targetItem = target.empty() ? nullptr : &target.front(); + if (!sourceItem && !targetItem) { // both queues have exhausted + Y_ABORT_UNLESS(sourceFinished && targetFinished); + return true; + } else if (sourceItem && targetItem) { // we have items in both queues, have to pick one according to key + const auto& sourceKey = sourceItem->GetKey(); + const auto& targetKey = targetItem->GetKey(); + if (sourceKey < targetKey) { + targetItem = nullptr; + } else if (targetKey < sourceKey) { + sourceItem = nullptr; + } + } + merge(sourceItem, targetItem); + if (sourceItem) { + source.pop_front(); + } + if (targetItem) { + target.pop_front(); + } + } + + if (!sourceFinished && source.empty()) { + IssueAssimilateRequest(SourceGroupId); + } + if (!targetFinished && target.empty()) { + IssueAssimilateRequest(TargetGroupId); + } + + return false; + } + + void TSyncerActor::IssueAssimilateRequest(TGroupId groupId) { + const auto it = GroupAssimilateState.find(groupId); + Y_ABORT_UNLESS(it != GroupAssimilateState.end()); + TAssimilateState& state = it->second; + + Y_ABORT_UNLESS(!state.BlocksFinished || !state.BarriersFinished || !state.BlobsFinished); + + STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS02, "issuing assimilate request", (LogId, LogId), (GroupId, groupId), + (SkipBlocksUpTo, state.SkipBlocksUpTo ? ToString(*state.SkipBlocksUpTo) : "<none>"), + (SkipBarriersUpTo, state.SkipBarriersUpTo ? TString(TStringBuilder() << '[' << + std::get<0>(*state.SkipBarriersUpTo) << ':' << std::get<1>(*state.SkipBarriersUpTo) << ']') : "<none>"), + (SkipBlobsUpTo, state.SkipBlobsUpTo ? state.SkipBlobsUpTo->ToString() : "<none>")); + + SendToBSProxy(SelfId(), it->first, new TEvBlobStorage::TEvAssimilate(state.SkipBlocksUpTo, state.SkipBarriersUpTo, + state.SkipBlobsUpTo, /*ignoreDecommitState=*/ true, /*reverse=*/ true), groupId.GetRawId()); + + Y_ABORT_UNLESS(!state.RequestInFlight); + state.RequestInFlight = true; + } + + void TSyncerActor::Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev) { + const auto groupId = TGroupId::FromValue(ev->Cookie); + + const auto it = GroupAssimilateState.find(groupId); + Y_ABORT_UNLESS(it != GroupAssimilateState.end()); + TAssimilateState& state = it->second; + + Y_ABORT_UNLESS(state.RequestInFlight); + state.RequestInFlight = false; + + auto& msg = *ev->Get(); + + if (!state.BlocksFinished && (msg.Blocks.empty() || !msg.Barriers.empty() || !msg.Blobs.empty())) { + state.BlocksFinished = true; + state.SkipBlocksUpTo.emplace(); // the last value as we are going reverse + } else { + state.SkipBlocksUpTo.emplace(msg.Blocks.back().TabletId); + } + if (state.BlocksFinished) { + if (!state.BarriersFinished && (msg.Barriers.empty() || !msg.Blobs.empty())) { + state.BarriersFinished = true; + state.SkipBarriersUpTo.emplace(); // the same logic for barriers + } else { + auto& lastBarrier = msg.Barriers.back(); + state.SkipBarriersUpTo.emplace(lastBarrier.TabletId, lastBarrier.Channel); + } + if (state.BarriersFinished && msg.Blobs.empty()) { + state.BlobsFinished = true; + } else { + state.SkipBlobsUpTo.emplace(msg.Blobs.back().Id); + } + } + + if (state.Blocks.empty()) { + state.Blocks = std::move(msg.Blocks); + } else { + state.Blocks.insert(state.Blocks.end(), msg.Blocks.begin(), msg.Blocks.end()); + } + if (state.Barriers.empty()) { + state.Barriers = std::move(msg.Barriers); + } else { + state.Barriers.insert(state.Barriers.end(), msg.Barriers.begin(), msg.Barriers.end()); + } + if (state.Blobs.empty()) { + state.Blobs = std::move(msg.Blobs); + } else { + state.Blobs.insert(state.Blobs.end(), msg.Blobs.begin(), msg.Blobs.end()); + } + + DoMergeLoop(); + } + + STRICT_STFUNC(TSyncerActor::StateFunc, + hFunc(TEvBlobStorage::TEvConfigureProxy, Handle) + hFunc(TEvBlobStorage::TEvAssimilateResult, Handle) + hFunc(TEvNodeWardenStorageConfig, Handle) + cFunc(TEvents::TSystem::Poison, PassAway) + ) + + IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId) { + return new TSyncerActor(std::move(info), targetPileId); + } + +} // NKikimr::NStorage::NBridge diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.h b/ydb/core/blobstorage/bridge/syncer/syncer.h new file mode 100644 index 00000000000..cef450e6aba --- /dev/null +++ b/ydb/core/blobstorage/bridge/syncer/syncer.h @@ -0,0 +1,11 @@ +#pragma once + +#include "defs.h" + +#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> + +namespace NKikimr::NStorage::NBridge { + + IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId); + +} // NKikimr::NStorage::NBridge diff --git a/ydb/core/blobstorage/bridge/syncer/syncer_impl.h b/ydb/core/blobstorage/bridge/syncer/syncer_impl.h new file mode 100644 index 00000000000..2cabbac822a --- /dev/null +++ b/ydb/core/blobstorage/bridge/syncer/syncer_impl.h @@ -0,0 +1,71 @@ +#pragma once + +#include "defs.h" + +#include <ydb/library/actors/core/actor_bootstrapped.h> +#include <ydb/core/base/blobstorage.h> +#include <ydb/core/base/bridge.h> +#include <ydb/core/blobstorage/base/blobstorage_events.h> +#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> + +#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> // for TEvConfigureProxy + +namespace NKikimr::NStorage::NBridge { + + class TSyncerActor : public TActorBootstrapped<TSyncerActor> { + TIntrusivePtr<TBlobStorageGroupInfo> Info; + TBridgePileId TargetPileId; + TBridgePileId SourcePileId; + TGroupId SourceGroupId; + TGroupId TargetGroupId; + TBridgeInfo::TPtr BridgeInfo; + TString LogId; + + public: + TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId); + + void Bootstrap(); + void PassAway() override; + + void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev); + + void Terminate(std::optional<TString> errorReason); + + STFUNC(StateFunc); + + void Handle(TEvNodeWardenStorageConfig::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Main sync logic + + void InitiateSync(); + void DoMergeLoop(); + + template<typename T, typename TCallback> + bool DoMergeEntities(std::deque<T>& source, std::deque<T>& target, bool sourceFinished, bool targetFinished, + TCallback&& merge); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Per-group assimilation status + + struct TAssimilateState { + std::optional<ui64> SkipBlocksUpTo; + std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo; + std::optional<TLogoBlobID> SkipBlobsUpTo; + std::deque<TEvBlobStorage::TEvAssimilateResult::TBlock> Blocks; + std::deque<TEvBlobStorage::TEvAssimilateResult::TBarrier> Barriers; + std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> Blobs; + bool RequestInFlight = false; + bool BlocksFinished = false; + bool BarriersFinished = false; + bool BlobsFinished = false; + }; + THashMap<TGroupId, TAssimilateState> GroupAssimilateState; + TAssimilateState *TargetState = nullptr; + TAssimilateState *SourceState = nullptr; + + void IssueAssimilateRequest(TGroupId groupId); + void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev); + }; + +} // NKikimr::NStorage::NBridge diff --git a/ydb/core/blobstorage/bridge/syncer/ya.make b/ydb/core/blobstorage/bridge/syncer/ya.make new file mode 100644 index 00000000000..26dddfb1ec1 --- /dev/null +++ b/ydb/core/blobstorage/bridge/syncer/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + SRCS( + defs.h + syncer.cpp + syncer.h + ) + + PEERDIR( + ydb/core/base + ydb/core/blobstorage/vdisk/common + ) +END() diff --git a/ydb/core/blobstorage/bridge/ya.make b/ydb/core/blobstorage/bridge/ya.make new file mode 100644 index 00000000000..2ccbd448541 --- /dev/null +++ b/ydb/core/blobstorage/bridge/ya.make @@ -0,0 +1,3 @@ +RECURSE( + syncer +) diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp index 6e92aec74ae..798fbe91101 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf.cpp @@ -378,6 +378,7 @@ namespace NKikimr::NStorage { hFunc(TEvNodeWardenQueryCache, Handle); hFunc(TEvNodeWardenUnsubscribeFromCache, Handle); hFunc(TEvNodeWardenUpdateConfigFromPeer, Handle); + hFunc(TEvNodeWardenManageSyncersResult, Handle); ) for (ui32 nodeId : std::exchange(UnsubscribeQueue, {})) { UnsubscribeInterconnect(nodeId); diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h index 80aee341461..67b3b4efec0 100644 --- a/ydb/core/blobstorage/nodewarden/distconf.h +++ b/ydb/core/blobstorage/nodewarden/distconf.h @@ -261,6 +261,13 @@ namespace NKikimr::NStorage { bool ScepterlessOperationInProgress = false; // when a leader operation is running while no Scepter is acquired TString ErrorReason; std::optional<TString> CurrentSelfAssemblyUUID; + bool ConfigsCollected = false; + + // bridge-related logic + std::set<std::tuple<ui32, TGroupId, TBridgePileId>> WorkingSyncersByNode; + std::set<std::tuple<TGroupId, TBridgePileId, ui32>> WorkingSyncers; + bool SyncerArrangeInFlight = false; + bool SyncerArrangePending = false; // subscribed IC sessions struct TSessionSubscription { @@ -369,6 +376,7 @@ namespace NKikimr::NStorage { void CheckRootNodeStatus(); void BecomeRoot(); void UnbecomeRoot(); + void CheckIfDone(); void HandleErrorTimeout(); void ProcessGather(TEvGather *res); bool HasQuorum(const NKikimrBlobStorage::TStorageConfig& config) const; @@ -410,6 +418,23 @@ namespace NKikimr::NStorage { void SwitchToError(const TString& reason); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Bridge ops + + // preparation (may be async) + void PrepareScatterTask(ui64 cookie, TScatterTask& task, const TEvScatter::TManageSyncers& request); + void Handle(TEvNodeWardenManageSyncersResult::TPtr ev); + + // execute per-node action + void Perform(TEvGather::TManageSyncers *response, const TEvScatter::TManageSyncers& request, TScatterTask& task); + + // handle gather result (when completed all along the cluster) + void ProcessManageSyncers(TEvGather::TManageSyncers *res); + + void RearrangeSyncing(); + void OnSyncerUnboundNode(ui32 nodeId); + void IssueQuerySyncers(); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Scatter/gather logic void IssueScatterTask(std::optional<TActorId> actorId, TEvScatter&& request); diff --git a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp index f0ca80520f2..21b596ed381 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp @@ -564,6 +564,8 @@ namespace NKikimr::NStorage { UnsubscribeQueue.insert(nodeId); + OnSyncerUnboundNode(nodeId); + CheckRootNodeStatus(); } } diff --git a/ydb/core/blobstorage/nodewarden/distconf_bridge.cpp b/ydb/core/blobstorage/nodewarden/distconf_bridge.cpp new file mode 100644 index 00000000000..67d1982c3a0 --- /dev/null +++ b/ydb/core/blobstorage/nodewarden/distconf_bridge.cpp @@ -0,0 +1,202 @@ +#include "distconf.h" + +namespace NKikimr::NStorage { + + void TDistributedConfigKeeper::PrepareScatterTask(ui64 cookie, TScatterTask& task, const TEvScatter::TManageSyncers& request) { + // issue query to node warden and wait until its completion + std::vector<TEvNodeWardenManageSyncers::TSyncer> runSyncers; + for (const auto& item : request.GetRunSyncers()) { + runSyncers.push_back({ + .NodeId = item.GetNodeId(), + .GroupId = TGroupId::FromProto(&item, &NKikimrBlobStorage::TStorageSyncerInfo::GetGroupId), + .TargetBridgePileId = TBridgePileId::FromProto(&item, &NKikimrBlobStorage::TStorageSyncerInfo::GetTargetBridgePileId), + }); + } + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenManageSyncers(std::move(runSyncers)), 0, cookie); + ++task.AsyncOperationsPending; + } + + void TDistributedConfigKeeper::Handle(TEvNodeWardenManageSyncersResult::TPtr ev) { + if (auto it = ScatterTasks.find(ev->Cookie); it != ScatterTasks.end()) { + TScatterTask& task = it->second; + auto *response = task.Response.MutableManageSyncers(); + auto *node = response->AddNodes(); + node->SetNodeId(SelfId().NodeId()); + for (const auto& workingSyncer : ev->Get()->WorkingSyncers) { + auto *item = node->AddSyncers(); + workingSyncer.GroupId.CopyToProto(item, &NKikimrBlobStorage::TStorageSyncerInfo::SetGroupId); + workingSyncer.TargetBridgePileId.CopyToProto(item, &NKikimrBlobStorage::TStorageSyncerInfo::SetTargetBridgePileId); + } + } + FinishAsyncOperation(ev->Cookie); + } + + void TDistributedConfigKeeper::Perform(TEvGather::TManageSyncers *response, const TEvScatter::TManageSyncers& /*request*/, TScatterTask& task) { + THashMap<ui32, TEvGather::TManageSyncers::TNode*> nodeMap; + for (size_t i = 0; i < response->NodesSize(); ++i) { + auto *node = response->MutableNodes(i); + nodeMap.emplace(node->GetNodeId(), node); + } + for (const auto& reply : task.CollectedResponses) { + for (const auto& node : reply.GetManageSyncers().GetNodes()) { + if (const auto it = nodeMap.find(node.GetNodeId()); it != nodeMap.end()) { + it->second->MergeFrom(node); + } else { + auto *newNode = response->AddNodes(); + newNode->CopyFrom(node); + nodeMap.emplace(newNode->GetNodeId(), newNode); + } + } + } + } + + void TDistributedConfigKeeper::ProcessManageSyncers(TEvGather::TManageSyncers *res) { + // actualize WorkingSyncers with received data + for (const auto& node : res->GetNodes()) { + const ui32 nodeId = node.GetNodeId(); + + // trim currently known working syncers for this node (as we received a bit freshier info) + const auto min = std::make_tuple(nodeId, TGroupId::Min(), TBridgePileId::Min()); + const auto begin = WorkingSyncersByNode.lower_bound(min); + const auto max = std::make_tuple(nodeId, TGroupId::Max(), TBridgePileId::Max()); + const auto end = WorkingSyncersByNode.upper_bound(max); + for (auto it = begin; it != end; ++it) { + const auto& [nodeId, groupId, targetBridgePileId] = *it; + const size_t num = WorkingSyncers.erase(std::make_tuple(groupId, targetBridgePileId, nodeId)); + Y_ABORT_UNLESS(num == 1); + } + WorkingSyncersByNode.erase(begin, end); + + // insert newly received syncers in the set + for (const auto& item : node.GetSyncers()) { + using T = std::decay_t<decltype(item)>; + const auto groupId = TGroupId::FromProto(&item, &T::GetGroupId); + const auto targetBridgePileId = TBridgePileId::FromProto(&item, &T::GetTargetBridgePileId); + const bool ins1 = WorkingSyncersByNode.emplace(nodeId, groupId, targetBridgePileId).second; + const bool ins2 = WorkingSyncers.emplace(groupId, targetBridgePileId, nodeId).second; + Y_ABORT_UNLESS(ins1 == ins2); + } + } + + // apply changes + RearrangeSyncing(); + } + + void TDistributedConfigKeeper::RearrangeSyncing() { + // run new syncers, stop unneeded ones + TEvScatter task; + TEvScatter::TManageSyncers *manage = nullptr; + + auto getManage = [&] { + if (!manage) { + manage = task.MutableManageSyncers(); + task.SetTaskId(RandomNumber<ui64>()); + } + return manage; + }; + + for (auto it = WorkingSyncers.begin(); it != WorkingSyncers.end(); ) { + const auto& [groupId, targetBridgePileId, nodeId] = *it; + + // find an end to this sequence + size_t numItems = 0; + auto jt = it; + while (jt != WorkingSyncers.end() && std::get<0>(*jt) == groupId && std::get<1>(*jt) == targetBridgePileId) { + ++jt, ++numItems; + } + + // we have to terminate excessive syncer(s) here + if (numItems > 1) { + auto *entry = getManage()->AddRunSyncers(); // explicitly let only single one remaining + const auto& [groupId, targetBridgePileId, nodeId] = *it; + entry->SetNodeId(nodeId); + groupId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetGroupId); + targetBridgePileId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetTargetBridgePileId); + } + + // advance to next pair + it = jt; + } + + std::vector<ui32> nodes; + auto prepareNodes = [&] { + if (nodes.empty()) { + // build list of all nodes expected to be working + nodes.push_back(SelfNode.NodeId()); + for (const auto& [nodeId, info] : AllBoundNodes) { + nodes.push_back(nodeId.NodeId()); + } + std::ranges::sort(nodes); + } + }; + + Y_ABORT_UNLESS(StorageConfig); + const auto& history = StorageConfig->GetClusterStateHistory(); + for (const auto& item : history.GetPileSyncState()) { + const auto bridgePileId = TBridgePileId::FromProto(&item, + &NKikimrBridge::TClusterStateHistory::TPileSyncState::GetBridgePileId); + + for (const auto& groupIdNum : item.GetUnsyncedGroupIds()) { + const auto groupId = TGroupId::FromValue(groupIdNum); + + const auto key = std::make_tuple(groupId, bridgePileId, 0); + const auto maxKey = std::make_tuple(groupId, bridgePileId, Max<ui32>()); + const auto it = WorkingSyncers.lower_bound(key); + if (it != WorkingSyncers.end() && *it <= maxKey) { + continue; // syncer already running + } + + auto *entry = getManage()->AddRunSyncers(); + const size_t hash = MultiHash(groupId, bridgePileId); + prepareNodes(); + entry->SetNodeId(nodes[hash % nodes.size()]); + groupId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetGroupId); + bridgePileId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetTargetBridgePileId); + } + } + + if (manage) { + IssueScatterTask(TActorId(), std::move(task)); + } else { + Y_ABORT_UNLESS(SyncerArrangeInFlight); + SyncerArrangeInFlight = false; + if (std::exchange(SyncerArrangePending, false)) { + IssueQuerySyncers(); + } + } + } + + void TDistributedConfigKeeper::OnSyncerUnboundNode(ui32 nodeId) { + bool changes = false; + + const auto min = std::make_tuple(nodeId, TGroupId::Min(), TBridgePileId::Min()); + for (auto it = WorkingSyncersByNode.lower_bound(min); it != WorkingSyncersByNode.end() && + std::get<0>(*it) == nodeId; it = WorkingSyncersByNode.erase(it)) { + const auto& [nodeId, groupId, targetBridgePileId] = *it; + const size_t num = WorkingSyncers.erase(std::make_tuple(groupId, targetBridgePileId, nodeId)); + Y_ABORT_UNLESS(num == 1); + changes = true; + } + + if (changes) { + RearrangeSyncing(); + } + } + + void TDistributedConfigKeeper::IssueQuerySyncers() { + if (!Cfg->BridgeConfig) { + return; + } + if (!SyncerArrangeInFlight) { + STLOG(PRI_DEBUG, BS_NODE, NWDC82, "Starting syncer collection", (Scepter, Scepter->Id), (RootState, RootState)); + TEvScatter task; + task.SetTaskId(RandomNumber<ui64>()); + task.MutableManageSyncers(); + IssueScatterTask(TActorId(), std::move(task)); + SyncerArrangeInFlight = true; + } else { + SyncerArrangePending = true; + } + } + +} // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp b/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp index 94bfdd03601..c4664da6548 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp @@ -82,7 +82,7 @@ namespace NKikimr::NStorage { if (nodeId <= AppData()->DynamicNameserviceConfig->MaxStaticNodeId) { continue; // static nodes were already processed } - STLOG(PRI_DEBUG, BS_NODE, NWDCC00, "disconnecting dynamic", (PeerNodeId, nodeId), + STLOG(PRI_DEBUG, BS_NODE, NWDCC03, "disconnecting dynamic", (PeerNodeId, nodeId), (BridgePileId, pile.BridgePileId)); as->Send(new IEventHandle(TEvInterconnect::EvDisconnect, 0, as->InterconnectProxy(nodeId), {}, nullptr, 0)); diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp index 47d585a0dc6..b2c11433c24 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp @@ -26,23 +26,24 @@ namespace NKikimr::NStorage { } void TDistributedConfigKeeper::BecomeRoot() { - auto makeAllBoundNodes = [&] { - TStringStream s; - const char *sep = "{"; - for (const auto& [nodeId, _] : AllBoundNodes) { - s << std::exchange(sep, " ") << nodeId; - } - s << '}'; - return s.Str(); - }; - STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection", (Scepter, Scepter->Id), - (AllBoundNodes, makeAllBoundNodes())); - RootState = ERootState::IN_PROGRESS; + RootState = ERootState::IN_PROGRESS; // collecting configs at least + + WorkingSyncersByNode.clear(); + WorkingSyncers.clear(); + SyncerArrangeInFlight = false; + SyncerArrangePending = false; + + // start collecting configs from all bound nodes + STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection", (Scepter, Scepter->Id)); + ConfigsCollected = false; TEvScatter task; task.SetTaskId(RandomNumber<ui64>()); task.MutableCollectConfigs(); IssueScatterTask(TActorId(), std::move(task)); + // start collecting syncers state if needed + IssueQuerySyncers(); + // establish connection to console tablet (if we have means to do it) Y_ABORT_UNLESS(!ConsolePipeId); ConnectToConsole(); @@ -52,6 +53,12 @@ namespace NKikimr::NStorage { DisconnectFromConsole(); } + void TDistributedConfigKeeper::CheckIfDone() { + if (RootState == ERootState::IN_PROGRESS && ConfigsCollected) { + RootState = ERootState::RELAX; + } + } + void TDistributedConfigKeeper::SwitchToError(const TString& reason) { STLOG(PRI_NOTICE, BS_NODE, NWDC38, "SwitchToError", (RootState, RootState), (Reason, reason)); if (Scepter) { @@ -93,10 +100,14 @@ namespace NKikimr::NStorage { if (auto error = ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) { SwitchToError(*error); } else { - RootState = ERootState::RELAX; + ConfigsCollected = true; + CheckIfDone(); } return; + case TEvGather::kManageSyncers: + return ProcessManageSyncers(res->MutableManageSyncers()); + case TEvGather::RESPONSE_NOT_SET: return SwitchToError("response not set"); } @@ -118,7 +129,8 @@ namespace NKikimr::NStorage { std::visit(TOverloaded{ [&](std::monostate&) { STLOG(PRI_DEBUG, BS_NODE, NWDC61, "ProcessCollectConfigs: monostate"); - RootState = ERootState::RELAX; + ConfigsCollected = true; + CheckIfDone(); }, [&](TString& error) { STLOG(PRI_DEBUG, BS_NODE, NWDC63, "ProcessCollectConfigs: error", (Error, error)); @@ -540,6 +552,10 @@ namespace NKikimr::NStorage { } break; + case TEvScatter::kManageSyncers: + PrepareScatterTask(cookie, task, task.Request.GetManageSyncers()); + break; + case TEvScatter::REQUEST_NOT_SET: break; } @@ -555,6 +571,10 @@ namespace NKikimr::NStorage { Perform(task.Response.MutableProposeStorageConfig(), task.Request.GetProposeStorageConfig(), task); break; + case TEvScatter::kManageSyncers: + Perform(task.Response.MutableManageSyncers(), task.Request.GetManageSyncers(), task); + break; + case TEvScatter::REQUEST_NOT_SET: // unexpected case break; diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.h b/ydb/core/blobstorage/nodewarden/distconf_invoke.h index e1be1c5f34b..2216b1390a0 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke.h +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.h @@ -15,6 +15,7 @@ namespace NKikimr::NStorage { const TActorId RequestSessionId; bool IsScepterlessOperation = false; + bool CheckSyncersAfterCommit = false; TActorId ParentId; ui32 WaitingReplyFromNode = 0; @@ -129,6 +130,8 @@ namespace NKikimr::NStorage { NKikimrBlobStorage::TStorageConfig GetSwitchBridgeNewConfig(const NKikimrBridge::TClusterState& newClusterState); bool CheckSwitchBridgeCommand(); + void NotifyBridgeSyncFinished(const TQuery::TNotifyBridgeSyncFinished& cmd); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Configuration proposition diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp index aa1f7dcb822..a452dfc7a1d 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp @@ -32,7 +32,30 @@ namespace NKikimr::NStorage { Y_ABORT_UNLESS(current.PerPileStateSize() == numPiles); ui32 numDifferent = 0; for (ui32 i = 0; i < numPiles; ++i) { - numDifferent += current.GetPerPileState(i) != newClusterState.GetPerPileState(i); + const auto currentState = current.GetPerPileState(i); + const auto newState = newClusterState.GetPerPileState(i); + if (currentState != newState) { + ++numDifferent; + switch (newState) { + case NKikimrBridge::TClusterState::DISCONNECTED: + // valid transition from any state + break; + + case NKikimrBridge::TClusterState::SYNCHRONIZED: + // invalid transition from any state + return FinishWithError(TResult::ERROR, "can't switch to SYNCHRONIZED directly"); + + case NKikimrBridge::TClusterState::NOT_SYNCHRONIZED: +// if (currentState == NKikimrBridge::TClusterState::SYNCHRONIZED) { +// return FinishWithError(TResult::ERROR, "invalid transition from SYNCHRONIZED to NOT_SYNCHRONIZED"); +// } + break; + + case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MIN_SENTINEL_DO_NOT_USE_: + case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MAX_SENTINEL_DO_NOT_USE_: + Y_ABORT(); + } + } } if (numDifferent > 1) { return FinishWithError(TResult::ERROR, "too many state changes in new configuration"); @@ -51,7 +74,16 @@ namespace NKikimr::NStorage { const NKikimrBridge::TClusterState& newClusterState) { NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; config.SetGeneration(config.GetGeneration() + 1); - config.MutableClusterState()->CopyFrom(newClusterState); + auto *clusterState = config.MutableClusterState(); + size_t changedPileIndex = Max<size_t>(); + for (size_t i = 0; i < clusterState->PerPileStateSize(); ++i) { + if (clusterState->GetPerPileState(i) != newClusterState.GetPerPileState(i)) { + Y_ABORT_UNLESS(changedPileIndex == Max<size_t>()); + changedPileIndex = i; + } + } + Y_ABORT_UNLESS(changedPileIndex != Max<size_t>()); + clusterState->CopyFrom(newClusterState); auto *history = config.MutableClusterStateHistory(); auto *entry = history->AddUnsyncedEntries(); @@ -61,6 +93,59 @@ namespace NKikimr::NStorage { entry->AddUnsyncedPiles(i); // all piles are unsynced by default } + switch (clusterState->GetPerPileState(changedPileIndex)) { + case NKikimrBridge::TClusterState::DISCONNECTED: + // this pile is not disconnected, there is no reason to synchronize it anymore + for (size_t i = 0; i < history->PileSyncStateSize(); ++i) { + const auto& item = history->GetPileSyncState(i); + if (item.GetBridgePileId() != changedPileIndex) { + break; + } + history->MutablePileSyncState()->DeleteSubrange(i, 1); + break; + } + break; + + case NKikimrBridge::TClusterState::NOT_SYNCHRONIZED: { + // we are coming into NOT_SYNCHRONIZED state; we have to build a list of all of our entities + NKikimrBridge::TClusterStateHistory::TPileSyncState *state = nullptr; + for (size_t i = 0; i < history->PileSyncStateSize(); ++i) { + state = history->MutablePileSyncState(i); + if (state->GetBridgePileId() == changedPileIndex) { + state->ClearUnsyncedGroupIds(); + state->ClearUnsyncedBSC(); + break; + } else { + state = nullptr; + } + } + if (!state) { + state = history->AddPileSyncState(); + state->SetBridgePileId(changedPileIndex); + } + state->SetUnsyncedBSC(true); + if (config.HasBlobStorageConfig()) { + if (const auto& bsConfig = config.GetBlobStorageConfig(); bsConfig.HasServiceSet()) { + const auto& ss = bsConfig.GetServiceSet(); + for (const auto& group : ss.GetGroups()) { + if (group.BridgeGroupIdsSize()) { + state->AddUnsyncedGroupIds(group.GetGroupID()); + } + } + } + } + CheckSyncersAfterCommit = true; + break; + } + + case NKikimrBridge::TClusterState::SYNCHRONIZED: + Y_ABORT("invalid transition"); + + case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MIN_SENTINEL_DO_NOT_USE_: + case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MAX_SENTINEL_DO_NOT_USE_: + Y_ABORT(); + } + return config; } @@ -77,4 +162,97 @@ namespace NKikimr::NStorage { return Self->HasQuorum(GetSwitchBridgeNewConfig(record.GetSwitchBridgeClusterState().GetNewClusterState())); } + void TInvokeRequestHandlerActor::NotifyBridgeSyncFinished(const TQuery::TNotifyBridgeSyncFinished& cmd) { + if (!RunCommonChecks()) { + return; + } else if (!Self->Cfg->BridgeConfig) { + return FinishWithError(TResult::ERROR, "Bridge mode is not enabled"); + } else if (Self->Cfg->BridgeConfig->PilesSize() <= cmd.GetBridgePileId()) { + return FinishWithError(TResult::ERROR, "BridgePileId out of bounds"); + } else if (Self->StorageConfig->GetClusterState().GetGeneration() != cmd.GetGeneration()) { + return FinishWithError(TResult::ERROR, "generation mismatch"); + } + + NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig; + config.SetGeneration(config.GetGeneration() + 1); + + auto *clusterState = config.MutableClusterState(); + auto *history = config.MutableClusterStateHistory(); + + size_t stateIndex; + NKikimrBridge::TClusterStateHistory::TPileSyncState *state = nullptr; + for (stateIndex = 0; stateIndex < history->PileSyncStateSize(); ++stateIndex) { + state = history->MutablePileSyncState(stateIndex); + if (state->GetBridgePileId() == cmd.GetBridgePileId()) { + break; + } else { + state = nullptr; + } + } + if (!state) { + return FinishWithError(TResult::ERROR, "unsynced pile not found"); + } + + switch (cmd.GetStatus()) { + case TQuery::TNotifyBridgeSyncFinished::Success: + if (cmd.HasBSC()) { + if (!cmd.GetBSC()) { + return FinishWithError(TResult::ERROR, "incorrect request"); + } + state->SetUnsyncedBSC(false); + } + if (cmd.HasGroupId()) { + auto *groups = state->MutableUnsyncedGroupIds(); + for (int i = 0; i < groups->size(); ++i) { + if (groups->at(i) == cmd.GetGroupId()) { + if (i != groups->size() - 1) { + groups->SwapElements(i, groups->size() - 1); + } + groups->RemoveLast(); + break; + } + } + } + if (cmd.UnsyncedGroupIdsToAddSize()) { + const auto& v = cmd.GetUnsyncedGroupIdsToAdd(); + state->MutableUnsyncedGroupIds()->Add(v.begin(), v.end()); + } + if (state->UnsyncedGroupIdsSize()) { + auto *groups = state->MutableUnsyncedGroupIds(); + std::ranges::sort(*groups); + const auto [first, last] = std::ranges::unique(*groups); + groups->erase(first, last); + } + if (!state->GetUnsyncedBSC() && !state->UnsyncedGroupIdsSize()) { + // fully synced, can switch to SYNCHRONIZED + history->MutablePileSyncState()->DeleteSubrange(stateIndex, 1); + clusterState->SetPerPileState(cmd.GetBridgePileId(), NKikimrBridge::TClusterState::SYNCHRONIZED); + } + break; + + case TQuery::TNotifyBridgeSyncFinished::TransientError: + break; + + case TQuery::TNotifyBridgeSyncFinished::PermanentError: { + bool found = false; + for (const auto& err : state->GetPermanentErrorReasons()) { + if (err == cmd.GetErrorReason()) { + found = true; + break; + } + } + if (!found) { + state->AddPermanentErrorReasons(cmd.GetErrorReason()); + } + break; + } + + case NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_INT_MIN_SENTINEL_DO_NOT_USE_: + case NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_INT_MAX_SENTINEL_DO_NOT_USE_: + Y_ABORT(); + } + + StartProposition(&config); + } + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp index 0ff0d6b9852..aa0d145b80a 100644 --- a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp +++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp @@ -21,7 +21,7 @@ namespace NKikimr::NStorage { void TInvokeRequestHandlerActor::Bootstrap(TActorId parentId) { if (LifetimeToken.expired()) { - return FinishWithError(TResult::ERROR, "distributed config keeper terminated"); + return FinishWithError(TResult::RACE, "distributed config keeper terminated"); } STLOG(PRI_DEBUG, BS_NODE, NWDC42, "TInvokeRequestHandlerActor::Bootstrap", (Sender, Sender), (Cookie, Cookie), @@ -31,10 +31,10 @@ namespace NKikimr::NStorage { Become(&TThis::StateFunc); if (Self->ScepterlessOperationInProgress) { - FinishWithError(TResult::ERROR, "an operation is already in progress"); + FinishWithError(TResult::RACE, "an operation is already in progress"); } else if (Self->Binding) { if (RequestSessionId) { - FinishWithError(TResult::ERROR, "no double-hop invokes allowed"); + FinishWithError(TResult::RACE, "no double-hop invokes allowed"); } else { const ui32 root = Self->Binding->RootNodeId; Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession); @@ -148,6 +148,9 @@ namespace NKikimr::NStorage { case TQuery::kGetStateStorageConfig: return GetStateStorageConfig(); + + case TQuery::kNotifyBridgeSyncFinished: + return NotifyBridgeSyncFinished(record.GetNotifyBridgeSyncFinished()); } FinishWithError(TResult::ERROR, "unhandled request"); @@ -270,6 +273,9 @@ namespace NKikimr::NStorage { if (auto error = Self->ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) { return error; } + if (CheckSyncersAfterCommit) { + InvokeOtherActor(*Self, &TDistributedConfigKeeper::IssueQuerySyncers); + } Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie); return std::nullopt; }; @@ -299,13 +305,13 @@ namespace NKikimr::NStorage { if (!Self->StorageConfig) { FinishWithError(TResult::ERROR, "no agreed StorageConfig"); } else if (Self->CurrentProposedStorageConfig) { - FinishWithError(TResult::ERROR, "config proposition request in flight"); + FinishWithError(TResult::RACE, "config proposition request in flight"); } else if (Self->RootState != (IsScepterlessOperation ? ERootState::INITIAL : ERootState::RELAX)) { - FinishWithError(TResult::ERROR, "something going on with default FSM"); + FinishWithError(TResult::RACE, "something going on with default FSM"); } else if (auto error = ValidateConfig(*Self->StorageConfig)) { FinishWithError(TResult::ERROR, TStringBuilder() << "current config validation failed: " << *error); } else if (IsScepterExpired()) { - FinishWithError(TResult::ERROR, "scepter lost during query execution"); + FinishWithError(TResult::RACE, "scepter lost during query execution"); } else { return true; } diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h index 7868396cc12..1dd02772552 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_events.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h @@ -167,7 +167,7 @@ namespace NKikimr::NStorage { }; struct TEvNodeWardenUpdateConfigFromPeer - : TEventLocal<TEvNodeWardenUpdateConfigFromPeer, TEvBlobStorage::EvNodeWardenUpdateConfigFromPeer> + : TEventLocal<TEvNodeWardenUpdateConfigFromPeer, TEvBlobStorage::EvNodeWardenUpdateConfigFromPeer> { NKikimrBlobStorage::TStorageConfig StorageConfig; @@ -176,4 +176,33 @@ namespace NKikimr::NStorage { {} }; + struct TEvNodeWardenManageSyncers + : TEventLocal<TEvNodeWardenManageSyncers, TEvBlobStorage::EvNodeWardenManageSyncers> + { + struct TSyncer { + ui32 NodeId; + TGroupId GroupId; + TBridgePileId TargetBridgePileId; + }; + std::vector<TSyncer> RunSyncers; + + TEvNodeWardenManageSyncers(std::vector<TSyncer>&& runSyncers) + : RunSyncers(std::move(runSyncers)) + {} + }; + + struct TEvNodeWardenManageSyncersResult + : TEventLocal<TEvNodeWardenManageSyncersResult, TEvBlobStorage::EvNodeWardenManageSyncersResult> + { + struct TSyncer { + TGroupId GroupId; + TBridgePileId TargetBridgePileId; + }; + std::vector<TSyncer> WorkingSyncers; + + TEvNodeWardenManageSyncersResult(std::vector<TSyncer>&& workingSyncers) + : WorkingSyncers(std::move(workingSyncers)) + {} + }; + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp index 0a12d33a4ec..b42c740bdd0 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp @@ -2,6 +2,8 @@ #include "node_warden_impl.h" #include "node_warden_events.h" +#include <ydb/core/blobstorage/bridge/syncer/syncer.h> + #include <ydb/core/blob_depot/agent/agent.h> #include <ydb/core/util/random.h> @@ -238,6 +240,9 @@ namespace NKikimr::NStorage { for (auto& vdisk : group.VDisksOfGroup) { UpdateGroupInfoForDisk(vdisk, info); } + for (const auto& [targetBridgePileId, actorId] : group.WorkingSyncers) { + Send(actorId, new TEvBlobStorage::TEvConfigureProxy(info, nullptr, nullptr)); + } } if (const auto it = GroupPendingQueue.find(groupId); it != GroupPendingQueue.end()) { @@ -318,4 +323,38 @@ namespace NKikimr::NStorage { } } + void TNodeWarden::HandleManageSyncers(TEvNodeWardenManageSyncers::TPtr ev) { + for (const auto& item : ev->Get()->RunSyncers) { + const ui32 groupId = item.GroupId.GetRawId(); + + if (item.NodeId == LocalNodeId) { + auto& group = Groups[groupId]; + if (TActorId& actorId = group.WorkingSyncers[item.TargetBridgePileId]; !actorId) { + STLOG(PRI_DEBUG, BS_NODE, NW64, "starting syncer actor", (GroupId, item.GroupId), + (TargetBridgePileId, item.TargetBridgePileId)); + actorId = Register(NBridge::CreateSyncerActor(NeedGroupInfo(groupId), item.TargetBridgePileId)); + } + } else if (const auto it = Groups.find(groupId); it != Groups.end()) { + TGroupRecord& group = it->second; + if (const auto jt = group.WorkingSyncers.find(item.TargetBridgePileId); jt != group.WorkingSyncers.end()) { + STLOG(PRI_DEBUG, BS_NODE, NW64, "stopping syncer actor", (GroupId, item.GroupId), + (TargetBridgePileId, item.TargetBridgePileId)); + TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, jt->second, SelfId(), nullptr, 0)); + group.WorkingSyncers.erase(jt); + } + } + } + + std::vector<TEvNodeWardenManageSyncersResult::TSyncer> workingSyncers; + for (const auto& [groupId, group] : Groups) { + for (const auto& [targetBridgePileId, actorId] : group.WorkingSyncers) { + workingSyncers.push_back({ + .GroupId = TGroupId::FromValue(groupId), + .TargetBridgePileId = targetBridgePileId, + }); + } + } + Send(ev->Sender, new TEvNodeWardenManageSyncersResult(std::move(workingSyncers)), 0, ev->Cookie); + } + } // NKikimr::NStorage diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index f7820eb3a36..d73b63d32b7 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -176,6 +176,8 @@ STATEFN(TNodeWarden::StateOnline) { hFunc(TEvNodeWardenQueryCacheResult, Handle); + hFunc(TEvNodeWardenManageSyncers, HandleManageSyncers); + default: EnqueuePendingMessage(ev); break; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 93f24586f93..9eacb0aa3a9 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -23,6 +23,7 @@ namespace NKikimr::NStorage { struct TEvNodeWardenNotifyConfigMismatch; struct TEvNodeWardenWriteMetadata; struct TEvNodeWardenQueryCacheResult; + struct TEvNodeWardenManageSyncers; constexpr ui32 ProxyConfigurationTimeoutMilliseconds = 200; constexpr TDuration BackoffMin = TDuration::MilliSeconds(20); @@ -526,6 +527,7 @@ namespace NKikimr::NStorage { TActorId GroupResolver; // resolver actor id TIntrusiveList<TVDiskRecord, TGroupRelationTag> VDisksOfGroup; TNodeLayoutInfoPtr NodeLayoutInfo; + THashMap<TBridgePileId, TActorId> WorkingSyncers; }; std::unordered_map<ui32, TGroupRecord> Groups; @@ -630,6 +632,8 @@ namespace NKikimr::NStorage { void Handle(NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate::TPtr ev); + void HandleManageSyncers(TAutoPtr<TEventHandle<TEvNodeWardenManageSyncers>> ev); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// TActorId DistributedConfigKeeperId; diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make index f1d42dd2f80..bfa05005083 100644 --- a/ydb/core/blobstorage/nodewarden/ya.make +++ b/ydb/core/blobstorage/nodewarden/ya.make @@ -6,6 +6,7 @@ SRCS( distconf.cpp distconf.h distconf_binding.cpp + distconf_bridge.cpp distconf_cache.cpp distconf_connectivity.cpp distconf_console.cpp @@ -45,6 +46,7 @@ PEERDIR( library/cpp/openssl/crypto ydb/core/base ydb/core/blob_depot/agent + ydb/core/blobstorage/bridge/syncer ydb/core/blobstorage/common ydb/core/blobstorage/crypto ydb/core/blobstorage/dsproxy/bridge diff --git a/ydb/core/blobstorage/ya.make b/ydb/core/blobstorage/ya.make index 807b6abe948..d0927adddfa 100644 --- a/ydb/core/blobstorage/ya.make +++ b/ydb/core/blobstorage/ya.make @@ -31,6 +31,7 @@ END() RECURSE( backpressure base + bridge crypto dsproxy groupinfo diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp index f2c54c90382..36bbe96b256 100644 --- a/ydb/core/mind/bscontroller/bsc.cpp +++ b/ydb/core/mind/bscontroller/bsc.cpp @@ -316,11 +316,38 @@ void TBlobStorageController::ApplyBscSettings(const NKikimrConfig::TBlobStorageC updateSettings->CopyFrom(bsConfig.GetBscSettings()); - STLOG(PRI_DEBUG, BS_CONTROLLER, BSC17, "ApplyBSCSettings", (Request, r)); + STLOG(PRI_DEBUG, BS_CONTROLLER, BSC39, "ApplyBSCSettings", (Request, r)); Send(SelfId(), ev.release()); } void TBlobStorageController::ApplyStorageConfig(bool ignoreDistconf) { + InvokeOnRootTimer.Reset(); + InvokeOnRootCmd.reset(); + + if (StorageConfig->HasClusterStateHistory()) { + const auto& history = StorageConfig->GetClusterStateHistory(); + for (const auto& unsynced : history.GetPileSyncState()) { + if (unsynced.GetUnsyncedBSC()) { + auto ev = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>(); + auto& record = ev->Record; + auto *nbsf = record.MutableNotifyBridgeSyncFinished(); + nbsf->SetGeneration(StorageConfig->GetClusterState().GetGeneration()); + nbsf->SetBridgePileId(unsynced.GetBridgePileId()); + using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TNotifyBridgeSyncFinished; + nbsf->SetStatus(TQuery::Success); + nbsf->SetBSC(true); + for (const auto& [groupId, info] : GroupMap) { + if (info->BridgeGroupInfo) { + groupId.CopyToProto(nbsf, &TQuery::AddUnsyncedGroupIdsToAdd); + } + } + // remember the command in case it fails + InvokeOnRootCmd.emplace(record); + Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), ev.release()); + } + } + } + if (!StorageConfig->HasBlobStorageConfig()) { return; } @@ -426,6 +453,24 @@ void TBlobStorageController::ApplyStorageConfig(bool ignoreDistconf) { } } +void TBlobStorageController::Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr ev) { + const auto status = ev->Get()->Record.GetStatus(); + const bool success = status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK; + const bool retriable = + status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::RACE || + status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::NO_QUORUM; + STLOG(retriable ? PRI_INFO : success ? PRI_DEBUG : PRI_WARN, BS_CONTROLLER, BSC38, "TEvNodeConfigInvokeOnRootResult", + (Response, ev->Get()->Record), (Success, success), (Retriable, retriable)); + if (success) { + InvokeOnRootCmd.reset(); + } else if (retriable && InvokeOnRootCmd) { + auto ev = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>(); + ev->Record.CopyFrom(*InvokeOnRootCmd); + TActivationContext::Schedule(TDuration::MilliSeconds(InvokeOnRootTimer.NextBackoffMs()), new IEventHandle( + MakeBlobStorageNodeWardenID(SelfId().NodeId()), SelfId(), ev.release())); + } +} + void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev) { auto& record = ev->Get()->Record; auto& response = record.GetResponse(); @@ -699,6 +744,7 @@ STFUNC(TBlobStorageController::StateWork) { fFunc(TEvBlobStorage::EvControllerShredRequest, EnqueueIncomingEvent); cFunc(TEvPrivate::EvUpdateShredState, ShredState.HandleUpdateShredState); cFunc(TEvPrivate::EvCommitMetrics, CommitMetrics); + hFunc(NStorage::TEvNodeConfigInvokeOnRootResult, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { STLOG(PRI_ERROR, BS_CONTROLLER, BSC06, "StateWork unexpected event", (Type, type), diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h index b45888a60eb..e5e1d1a172f 100644 --- a/ydb/core/mind/bscontroller/impl.h +++ b/ydb/core/mind/bscontroller/impl.h @@ -11,6 +11,8 @@ #include "self_heal.h" #include "storage_pool_stat.h" +#include <ydb/core/blobstorage/nodewarden/node_warden_events.h> + #include <util/generic/hash_multi_map.h> inline IOutputStream& operator <<(IOutputStream& o, NKikimr::TErasureType::EErasureSpecies p) { @@ -1639,6 +1641,9 @@ private: class TConsoleInteraction; std::unique_ptr<TConsoleInteraction> ConsoleInteraction; + TBackoffTimer InvokeOnRootTimer{10, 3000}; + std::optional<NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot> InvokeOnRootCmd; + struct TEvPrivate { enum EEv { EvUpdateSystemViews = EventSpaceBegin(TEvents::ES_PRIVATE), @@ -1850,6 +1855,7 @@ private: bool HostConfigEquals(const THostConfigInfo& left, const NKikimrBlobStorage::TDefineHostConfig& right) const; void ApplyBscSettings(const NKikimrConfig::TBlobStorageConfig& bsConfig); void ApplyStorageConfig(bool ignoreDistconf = false); + void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr ev); void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev); void Handle(TEvBlobStorage::TEvControllerDistconfRequest::TPtr ev); diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp index 360eadc5aa7..7f9bbe66a4a 100644 --- a/ydb/core/mind/bscontroller/register_node.cpp +++ b/ydb/core/mind/bscontroller/register_node.cpp @@ -383,12 +383,12 @@ public: if (!record.HasStorageConfigVersion() || record.GetStorageConfigVersion() < configVersion) { yamlConfig->SetCompressedStorageConfig(CompressStorageYamlConfig(*Self->StorageYamlConfig)); } else if (configVersion < record.GetStorageConfigVersion()) { - STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN09, "storage config version on node is greater than one known to BSC", + STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN10, "storage config version on node is greater than one known to BSC", (NodeId, record.GetNodeID()), (NodeVersion, record.GetMainConfigVersion()), (StoredVersion, configVersion)); } else if (record.GetStorageConfigHash() != Self->StorageYamlConfigHash) { - STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN11, "node storage config hash mismatch", + STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN12, "node storage config hash mismatch", (NodeId, record.GetNodeID())); } } diff --git a/ydb/core/mind/bscontroller/shred.cpp b/ydb/core/mind/bscontroller/shred.cpp index 243aadef31c..b8457281b0a 100644 --- a/ydb/core/mind/bscontroller/shred.cpp +++ b/ydb/core/mind/bscontroller/shred.cpp @@ -33,7 +33,7 @@ namespace NKikimr::NBsController { {} void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev) { - STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "received TEvControllerShredRequest", (Record, ev->Get()->Record)); + STLOG(PRI_DEBUG, BS_SHRED, BSSC00, "received TEvControllerShredRequest", (Record, ev->Get()->Record)); Self->Execute(new TTxUpdateShred(this, ev)); } @@ -102,7 +102,7 @@ namespace NKikimr::NBsController { } for (auto& [nodeId, ev] : outbox) { - STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "issuing TEvControllerNodeServiceSetUpdate", (NodeId, nodeId), + STLOG(PRI_DEBUG, BS_SHRED, BSSC01, "issuing TEvControllerNodeServiceSetUpdate", (NodeId, nodeId), (Record, ev->Record)); Self->Send(MakeBlobStorageNodeWardenID(nodeId), ev.release()); } @@ -123,7 +123,7 @@ namespace NKikimr::NBsController { } void OnShredFinished(TPDiskId pdiskId, TPDiskInfo& pdiskInfo, ui64 generation, TTransactionContext& txc) { - STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "shred finished", (PDiskId, pdiskId), (Generation, generation), + STLOG(PRI_DEBUG, BS_SHRED, BSSC02, "shred finished", (PDiskId, pdiskId), (Generation, generation), (ShredInProgress, pdiskInfo.ShredInProgress), (ShredComplete, pdiskInfo.ShredComplete), (CurrentGeneration, GetCurrentGeneration())); @@ -151,7 +151,7 @@ namespace NKikimr::NBsController { } void OnShredAborted(TPDiskId pdiskId, TPDiskInfo& pdiskInfo) { - STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "shred aborted", (PDiskId, pdiskId), + STLOG(PRI_DEBUG, BS_SHRED, BSSC03, "shred aborted", (PDiskId, pdiskId), (ShredInProgress, pdiskInfo.ShredInProgress), (ShredComplete, pdiskInfo.ShredComplete)); EndShredForPDisk(pdiskId, pdiskInfo); diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto index 156fb6a5aa2..90fbe128f88 100644 --- a/ydb/core/protos/blobstorage_distributed_config.proto +++ b/ydb/core/protos/blobstorage_distributed_config.proto @@ -93,6 +93,12 @@ message TEvNodeConfigReversePush { message TEvNodeConfigUnbind { } +message TStorageSyncerInfo { + uint32 NodeId = 1; // a node that contains running blobstorage syncer actor + uint32 GroupId = 2; // bridged group id + uint32 TargetBridgePileId = 3; // a target bridge pile sync goes for +} + // Propagate query to the tree bottom and collect replies. message TEvNodeConfigScatter { message TCollectConfigs { @@ -102,12 +108,17 @@ message TEvNodeConfigScatter { TStorageConfig Config = 1; } + message TManageSyncers { + repeated TStorageSyncerInfo RunSyncers = 1; // syncers expected to be running (suggests stopping the same ones on other nodes) + } + optional uint64 Cookie = 1; optional fixed64 TaskId = 4; // random (and not necessarily unique) identifier oneof Request { TCollectConfigs CollectConfigs = 2; TProposeStorageConfig ProposeStorageConfig = 3; + TManageSyncers ManageSyncers = 5; } } @@ -158,12 +169,21 @@ message TEvNodeConfigGather { repeated TStatus Status = 1; } + message TManageSyncers { + message TNode { + uint32 NodeId = 1; + repeated TStorageSyncerInfo Syncers = 2; + } + repeated TNode Nodes = 1; + } + optional uint64 Cookie = 1; optional bool Aborted = 4; oneof Response { TCollectConfigs CollectConfigs = 2; TProposeStorageConfig ProposeStorageConfig = 3; + TManageSyncers ManageSyncers = 5; } } @@ -244,6 +264,23 @@ message TEvNodeConfigInvokeOnRoot { NKikimrBridge.TClusterState NewClusterState = 1; } + message TNotifyBridgeSyncFinished { + enum EStatus { + Success = 0; + TransientError = 1; + PermanentError = 2; + } + uint64 Generation = 1; // generation matching ClusterState's one + uint32 BridgePileId = 2; // which bridge pile is marked synchronized + EStatus Status = 3; // status of this synchronization + optional string ErrorReason = 4; // textual description of an error + oneof Entity { + uint32 GroupId = 5; + bool BSC = 6; + } + repeated uint32 UnsyncedGroupIdsToAdd = 7; + } + oneof Request { TUpdateConfig UpdateConfig = 1; TQueryConfig QueryConfig = 2; @@ -258,6 +295,7 @@ message TEvNodeConfigInvokeOnRoot { TSwitchBridgeClusterState SwitchBridgeClusterState = 11; TStateStorageConfig ReconfigStateStorage = 12; TGetStateStorageConfig GetStateStorageConfig = 13; + TNotifyBridgeSyncFinished NotifyBridgeSyncFinished = 14; } } diff --git a/ydb/core/protos/bridge.proto b/ydb/core/protos/bridge.proto index 4182f5a8644..90e56c0d8bc 100644 --- a/ydb/core/protos/bridge.proto +++ b/ydb/core/protos/bridge.proto @@ -22,4 +22,12 @@ message TClusterStateHistory { repeated uint32 UnsyncedPiles = 3; // a list of pile ids that did not refer to this entry; trimmed when empty } repeated TUnsyncedEntry UnsyncedEntries = 1; // entries stored in order of ascending Generation + + message TPileSyncState { + uint32 BridgePileId = 1; + repeated string PermanentErrorReasons = 2; // a set of detected permanent errors for this pile + repeated uint32 UnsyncedGroupIds = 3; // a set of unsynced groups for this pile + bool UnsyncedBSC = 4; // true if BSC is not synced with this configuration yet + } + repeated TPileSyncState PileSyncState = 2; // per-pile sync state for current generation } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index d3cc385c4b6..d59c5d3b1bd 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -58,6 +58,7 @@ enum EServiceKikimr { BS_PROXY_CHECKINTEGRITY = 2603; BS_PROXY_BRIDGE = 2604; BS_CLUSTER_BALANCING = 2605; + BS_BRIDGE_SYNC = 2606; // DATASHARD section // TX_DATASHARD = 290; // |