aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2025-07-02 12:17:53 +0300
committerGitHub <noreply@github.com>2025-07-02 09:17:53 +0000
commitf0176285cf0af5c49d3d5ea352d57352bfa02751 (patch)
tree651532ff02366d07c4cbd803741027240d3b7973
parentbf7fd3e4df3d40b0355f29894aa78709f084c749 (diff)
downloadydb-f0176285cf0af5c49d3d5ea352d57352bfa02751.tar.gz
Introduce BlobStorage syncer machinery for Bridge mode (#20423)
-rw-r--r--ydb/core/base/blobstorage.h14
-rw-r--r--ydb/core/base/id_wrapper.h3
-rw-r--r--ydb/core/blobstorage/bridge/defs.h3
-rw-r--r--ydb/core/blobstorage/bridge/syncer/defs.h3
-rw-r--r--ydb/core/blobstorage/bridge/syncer/syncer.cpp227
-rw-r--r--ydb/core/blobstorage/bridge/syncer/syncer.h11
-rw-r--r--ydb/core/blobstorage/bridge/syncer/syncer_impl.h71
-rw-r--r--ydb/core/blobstorage/bridge/syncer/ya.make12
-rw-r--r--ydb/core/blobstorage/bridge/ya.make3
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.cpp1
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf.h25
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_binding.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_bridge.cpp202
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_fsm.cpp48
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke.h3
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp182
-rw-r--r--ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp18
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_events.h31
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_group.cpp39
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp2
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h4
-rw-r--r--ydb/core/blobstorage/nodewarden/ya.make2
-rw-r--r--ydb/core/blobstorage/ya.make1
-rw-r--r--ydb/core/mind/bscontroller/bsc.cpp48
-rw-r--r--ydb/core/mind/bscontroller/impl.h6
-rw-r--r--ydb/core/mind/bscontroller/register_node.cpp4
-rw-r--r--ydb/core/mind/bscontroller/shred.cpp8
-rw-r--r--ydb/core/protos/blobstorage_distributed_config.proto38
-rw-r--r--ydb/core/protos/bridge.proto8
-rw-r--r--ydb/library/services/services.proto1
31 files changed, 991 insertions, 31 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 9ab3bb4ce8a..bda07034c5c 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -928,6 +928,8 @@ struct TEvBlobStorage {
EvNodeWardenUnsubscribeFromCache,
EvNodeWardenNotifyConfigMismatch,
EvNodeWardenUpdateConfigFromPeer,
+ EvNodeWardenManageSyncers,
+ EvNodeWardenManageSyncersResult,
// Other
EvRunActor = EvPut + 15 * 512,
@@ -2570,6 +2572,10 @@ struct TEvBlobStorage {
void Output(IOutputStream& s) const {
s << "{" << TabletId << "=>" << BlockedGeneration << "}";
}
+
+ auto GetKey() const {
+ return std::tie(TabletId);
+ }
};
struct TBarrier {
@@ -2605,6 +2611,10 @@ struct TEvBlobStorage {
Hard.Output(s);
s << "}";
}
+
+ auto GetKey() const {
+ return std::tie(TabletId, Channel);
+ }
};
struct TBlob {
@@ -2627,6 +2637,10 @@ struct TEvBlobStorage {
s << "d";
}
}
+
+ auto GetKey() const {
+ return std::tie(Id);
+ }
};
NKikimrProto::EReplyStatus Status;
diff --git a/ydb/core/base/id_wrapper.h b/ydb/core/base/id_wrapper.h
index de3f4c8842f..9f77b93fdba 100644
--- a/ydb/core/base/id_wrapper.h
+++ b/ydb/core/base/id_wrapper.h
@@ -78,6 +78,9 @@ public:
T GetRawId() const { return Raw; }
+ static const TIdWrapper Min() { return FromValue(::Min<T>()); }
+ static const TIdWrapper Max() { return FromValue(::Max<T>()); }
+
friend std::hash<TIdWrapper<T, Tag>>;
friend THash<TIdWrapper<T, Tag>>;
diff --git a/ydb/core/blobstorage/bridge/defs.h b/ydb/core/blobstorage/bridge/defs.h
new file mode 100644
index 00000000000..d067528eb9e
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/defs.h
@@ -0,0 +1,3 @@
+#pragma once
+
+#include <ydb/core/blobstorage/defs.h>
diff --git a/ydb/core/blobstorage/bridge/syncer/defs.h b/ydb/core/blobstorage/bridge/syncer/defs.h
new file mode 100644
index 00000000000..735ee3e1ae5
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/syncer/defs.h
@@ -0,0 +1,3 @@
+#pragma once
+
+#include <ydb/core/blobstorage/bridge/defs.h>
diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.cpp b/ydb/core/blobstorage/bridge/syncer/syncer.cpp
new file mode 100644
index 00000000000..9481928f4b3
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/syncer/syncer.cpp
@@ -0,0 +1,227 @@
+#include "syncer.h"
+#include "syncer_impl.h"
+#include <ydb/core/util/stlog.h>
+
+namespace NKikimr::NStorage::NBridge {
+
+ TSyncerActor::TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId)
+ : Info(std::move(info))
+ , TargetPileId(targetPileId)
+ {
+ Y_ABORT_UNLESS(!Info || Info->IsBridged());
+ }
+
+ void TSyncerActor::Bootstrap() {
+ LogId = TStringBuilder() << SelfId();
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS00, "bootstrapping bridged blobstorage syncer", (LogId, LogId),
+ (TargetPileId, TargetPileId));
+ Become(&TThis::StateFunc);
+ if (Info) {
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true));
+ }
+ }
+
+ void TSyncerActor::PassAway() {
+ TActorBootstrapped::PassAway();
+ }
+
+ void TSyncerActor::Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev) {
+ if (!Info) {
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenQueryStorageConfig(true));
+ }
+ Info = std::move(ev->Get()->Info);
+ Y_ABORT_UNLESS(Info);
+ Y_ABORT_UNLESS(Info->IsBridged());
+ }
+
+ void TSyncerActor::Terminate(std::optional<TString> errorReason) {
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS04, "syncing finished", (LogId, LogId), (ErrorReason, errorReason));
+ PassAway();
+ }
+
+ void TSyncerActor::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
+ const bool initial = !BridgeInfo;
+ BridgeInfo = ev->Get()->BridgeInfo;
+ Y_ABORT_UNLESS(BridgeInfo);
+ const auto& targetPile = *BridgeInfo->GetPile(TargetPileId);
+ if (targetPile.State != NKikimrBridge::TClusterState::NOT_SYNCHRONIZED) {
+ // there is absolutely no need in synchronization: pile is either marked synchronized (what would be strange),
+ // or it is disconnected; we terminate in either case
+ return Terminate("target pile is not NOT_SYNCHRONIZED anymore");
+ }
+ if (!initial && BridgeInfo->GetPile(SourcePileId)->State != NKikimrBridge::TClusterState::SYNCHRONIZED) {
+ return Terminate("source pile is not SYNCHRONIZED anymore");
+ }
+ if (initial) {
+ InitiateSync();
+ }
+ }
+
+ void TSyncerActor::InitiateSync() {
+ // remember our source pile (primary one) on first start; actually, we can pick any of SYNCHRONIZED
+ Y_ABORT_UNLESS(BridgeInfo->PrimaryPile->State == NKikimrBridge::TClusterState::SYNCHRONIZED);
+ SourcePileId = BridgeInfo->PrimaryPile->BridgePileId;
+ const auto& groups = Info->GetBridgeGroupIds();
+ Y_ABORT_UNLESS(groups.size() == std::size(BridgeInfo->Piles));
+ SourceGroupId = groups[SourcePileId.GetRawId()];
+ TargetGroupId = groups[TargetPileId.GetRawId()];
+ LogId = TStringBuilder() << LogId << '{' << SourceGroupId << "->" << TargetGroupId << '}';
+ SourceState = &GroupAssimilateState[SourceGroupId];
+ TargetState = &GroupAssimilateState[TargetGroupId];
+ IssueAssimilateRequest(SourceGroupId);
+ IssueAssimilateRequest(TargetGroupId);
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS01, "initiating sync", (LogId, LogId), (SourceGroupId, SourceGroupId),
+ (TargetGroupId, TargetGroupId));
+ }
+
+ void TSyncerActor::DoMergeLoop() {
+ if (SourceState->RequestInFlight || TargetState->RequestInFlight) {
+ return; // nothing to merge yet
+ }
+
+ auto mergeBlocks = [&](auto *sourceItem, auto *targetItem) {
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS05, "merging block", (LogId, LogId), (SourceItem, sourceItem),
+ (TargetItem, targetItem));
+ };
+
+ auto mergeBarriers = [&](auto *sourceItem, auto *targetItem) {
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS06, "merging barrier", (LogId, LogId), (SourceItem, sourceItem),
+ (TargetItem, targetItem));
+ };
+
+ auto mergeBlobs = [&](auto *sourceItem, auto *targetItem) {
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS03, "merging blob", (LogId, LogId), (SourceItem, sourceItem),
+ (TargetItem, targetItem));
+ };
+
+#define MERGE(NAME) \
+ if (!DoMergeEntities(SourceState->NAME, TargetState->NAME, SourceState->NAME##Finished, TargetState->NAME##Finished, merge##NAME)) { \
+ return; \
+ }
+ MERGE(Blocks)
+ MERGE(Barriers)
+ MERGE(Blobs)
+#undef MERGE
+ }
+
+ template<typename T, typename TCallback>
+ bool TSyncerActor::DoMergeEntities(std::deque<T>& source, std::deque<T>& target, bool sourceFinished, bool targetFinished,
+ TCallback&& merge) {
+ while ((!source.empty() || sourceFinished) && (!target.empty() || targetFinished)) {
+ auto *sourceItem = source.empty() ? nullptr : &source.front();
+ auto *targetItem = target.empty() ? nullptr : &target.front();
+ if (!sourceItem && !targetItem) { // both queues have exhausted
+ Y_ABORT_UNLESS(sourceFinished && targetFinished);
+ return true;
+ } else if (sourceItem && targetItem) { // we have items in both queues, have to pick one according to key
+ const auto& sourceKey = sourceItem->GetKey();
+ const auto& targetKey = targetItem->GetKey();
+ if (sourceKey < targetKey) {
+ targetItem = nullptr;
+ } else if (targetKey < sourceKey) {
+ sourceItem = nullptr;
+ }
+ }
+ merge(sourceItem, targetItem);
+ if (sourceItem) {
+ source.pop_front();
+ }
+ if (targetItem) {
+ target.pop_front();
+ }
+ }
+
+ if (!sourceFinished && source.empty()) {
+ IssueAssimilateRequest(SourceGroupId);
+ }
+ if (!targetFinished && target.empty()) {
+ IssueAssimilateRequest(TargetGroupId);
+ }
+
+ return false;
+ }
+
+ void TSyncerActor::IssueAssimilateRequest(TGroupId groupId) {
+ const auto it = GroupAssimilateState.find(groupId);
+ Y_ABORT_UNLESS(it != GroupAssimilateState.end());
+ TAssimilateState& state = it->second;
+
+ Y_ABORT_UNLESS(!state.BlocksFinished || !state.BarriersFinished || !state.BlobsFinished);
+
+ STLOG(PRI_DEBUG, BS_BRIDGE_SYNC, BRSS02, "issuing assimilate request", (LogId, LogId), (GroupId, groupId),
+ (SkipBlocksUpTo, state.SkipBlocksUpTo ? ToString(*state.SkipBlocksUpTo) : "<none>"),
+ (SkipBarriersUpTo, state.SkipBarriersUpTo ? TString(TStringBuilder() << '[' <<
+ std::get<0>(*state.SkipBarriersUpTo) << ':' << std::get<1>(*state.SkipBarriersUpTo) << ']') : "<none>"),
+ (SkipBlobsUpTo, state.SkipBlobsUpTo ? state.SkipBlobsUpTo->ToString() : "<none>"));
+
+ SendToBSProxy(SelfId(), it->first, new TEvBlobStorage::TEvAssimilate(state.SkipBlocksUpTo, state.SkipBarriersUpTo,
+ state.SkipBlobsUpTo, /*ignoreDecommitState=*/ true, /*reverse=*/ true), groupId.GetRawId());
+
+ Y_ABORT_UNLESS(!state.RequestInFlight);
+ state.RequestInFlight = true;
+ }
+
+ void TSyncerActor::Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev) {
+ const auto groupId = TGroupId::FromValue(ev->Cookie);
+
+ const auto it = GroupAssimilateState.find(groupId);
+ Y_ABORT_UNLESS(it != GroupAssimilateState.end());
+ TAssimilateState& state = it->second;
+
+ Y_ABORT_UNLESS(state.RequestInFlight);
+ state.RequestInFlight = false;
+
+ auto& msg = *ev->Get();
+
+ if (!state.BlocksFinished && (msg.Blocks.empty() || !msg.Barriers.empty() || !msg.Blobs.empty())) {
+ state.BlocksFinished = true;
+ state.SkipBlocksUpTo.emplace(); // the last value as we are going reverse
+ } else {
+ state.SkipBlocksUpTo.emplace(msg.Blocks.back().TabletId);
+ }
+ if (state.BlocksFinished) {
+ if (!state.BarriersFinished && (msg.Barriers.empty() || !msg.Blobs.empty())) {
+ state.BarriersFinished = true;
+ state.SkipBarriersUpTo.emplace(); // the same logic for barriers
+ } else {
+ auto& lastBarrier = msg.Barriers.back();
+ state.SkipBarriersUpTo.emplace(lastBarrier.TabletId, lastBarrier.Channel);
+ }
+ if (state.BarriersFinished && msg.Blobs.empty()) {
+ state.BlobsFinished = true;
+ } else {
+ state.SkipBlobsUpTo.emplace(msg.Blobs.back().Id);
+ }
+ }
+
+ if (state.Blocks.empty()) {
+ state.Blocks = std::move(msg.Blocks);
+ } else {
+ state.Blocks.insert(state.Blocks.end(), msg.Blocks.begin(), msg.Blocks.end());
+ }
+ if (state.Barriers.empty()) {
+ state.Barriers = std::move(msg.Barriers);
+ } else {
+ state.Barriers.insert(state.Barriers.end(), msg.Barriers.begin(), msg.Barriers.end());
+ }
+ if (state.Blobs.empty()) {
+ state.Blobs = std::move(msg.Blobs);
+ } else {
+ state.Blobs.insert(state.Blobs.end(), msg.Blobs.begin(), msg.Blobs.end());
+ }
+
+ DoMergeLoop();
+ }
+
+ STRICT_STFUNC(TSyncerActor::StateFunc,
+ hFunc(TEvBlobStorage::TEvConfigureProxy, Handle)
+ hFunc(TEvBlobStorage::TEvAssimilateResult, Handle)
+ hFunc(TEvNodeWardenStorageConfig, Handle)
+ cFunc(TEvents::TSystem::Poison, PassAway)
+ )
+
+ IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId) {
+ return new TSyncerActor(std::move(info), targetPileId);
+ }
+
+} // NKikimr::NStorage::NBridge
diff --git a/ydb/core/blobstorage/bridge/syncer/syncer.h b/ydb/core/blobstorage/bridge/syncer/syncer.h
new file mode 100644
index 00000000000..cef450e6aba
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/syncer/syncer.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "defs.h"
+
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
+
+namespace NKikimr::NStorage::NBridge {
+
+ IActor *CreateSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId);
+
+} // NKikimr::NStorage::NBridge
diff --git a/ydb/core/blobstorage/bridge/syncer/syncer_impl.h b/ydb/core/blobstorage/bridge/syncer/syncer_impl.h
new file mode 100644
index 00000000000..2cabbac822a
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/syncer/syncer_impl.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include "defs.h"
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+#include <ydb/core/base/blobstorage.h>
+#include <ydb/core/base/bridge.h>
+#include <ydb/core/blobstorage/base/blobstorage_events.h>
+#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
+
+#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> // for TEvConfigureProxy
+
+namespace NKikimr::NStorage::NBridge {
+
+ class TSyncerActor : public TActorBootstrapped<TSyncerActor> {
+ TIntrusivePtr<TBlobStorageGroupInfo> Info;
+ TBridgePileId TargetPileId;
+ TBridgePileId SourcePileId;
+ TGroupId SourceGroupId;
+ TGroupId TargetGroupId;
+ TBridgeInfo::TPtr BridgeInfo;
+ TString LogId;
+
+ public:
+ TSyncerActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TBridgePileId targetPileId);
+
+ void Bootstrap();
+ void PassAway() override;
+
+ void Handle(TEvBlobStorage::TEvConfigureProxy::TPtr ev);
+
+ void Terminate(std::optional<TString> errorReason);
+
+ STFUNC(StateFunc);
+
+ void Handle(TEvNodeWardenStorageConfig::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Main sync logic
+
+ void InitiateSync();
+ void DoMergeLoop();
+
+ template<typename T, typename TCallback>
+ bool DoMergeEntities(std::deque<T>& source, std::deque<T>& target, bool sourceFinished, bool targetFinished,
+ TCallback&& merge);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Per-group assimilation status
+
+ struct TAssimilateState {
+ std::optional<ui64> SkipBlocksUpTo;
+ std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo;
+ std::optional<TLogoBlobID> SkipBlobsUpTo;
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlock> Blocks;
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBarrier> Barriers;
+ std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> Blobs;
+ bool RequestInFlight = false;
+ bool BlocksFinished = false;
+ bool BarriersFinished = false;
+ bool BlobsFinished = false;
+ };
+ THashMap<TGroupId, TAssimilateState> GroupAssimilateState;
+ TAssimilateState *TargetState = nullptr;
+ TAssimilateState *SourceState = nullptr;
+
+ void IssueAssimilateRequest(TGroupId groupId);
+ void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev);
+ };
+
+} // NKikimr::NStorage::NBridge
diff --git a/ydb/core/blobstorage/bridge/syncer/ya.make b/ydb/core/blobstorage/bridge/syncer/ya.make
new file mode 100644
index 00000000000..26dddfb1ec1
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/syncer/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+ SRCS(
+ defs.h
+ syncer.cpp
+ syncer.h
+ )
+
+ PEERDIR(
+ ydb/core/base
+ ydb/core/blobstorage/vdisk/common
+ )
+END()
diff --git a/ydb/core/blobstorage/bridge/ya.make b/ydb/core/blobstorage/bridge/ya.make
new file mode 100644
index 00000000000..2ccbd448541
--- /dev/null
+++ b/ydb/core/blobstorage/bridge/ya.make
@@ -0,0 +1,3 @@
+RECURSE(
+ syncer
+)
diff --git a/ydb/core/blobstorage/nodewarden/distconf.cpp b/ydb/core/blobstorage/nodewarden/distconf.cpp
index 6e92aec74ae..798fbe91101 100644
--- a/ydb/core/blobstorage/nodewarden/distconf.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf.cpp
@@ -378,6 +378,7 @@ namespace NKikimr::NStorage {
hFunc(TEvNodeWardenQueryCache, Handle);
hFunc(TEvNodeWardenUnsubscribeFromCache, Handle);
hFunc(TEvNodeWardenUpdateConfigFromPeer, Handle);
+ hFunc(TEvNodeWardenManageSyncersResult, Handle);
)
for (ui32 nodeId : std::exchange(UnsubscribeQueue, {})) {
UnsubscribeInterconnect(nodeId);
diff --git a/ydb/core/blobstorage/nodewarden/distconf.h b/ydb/core/blobstorage/nodewarden/distconf.h
index 80aee341461..67b3b4efec0 100644
--- a/ydb/core/blobstorage/nodewarden/distconf.h
+++ b/ydb/core/blobstorage/nodewarden/distconf.h
@@ -261,6 +261,13 @@ namespace NKikimr::NStorage {
bool ScepterlessOperationInProgress = false; // when a leader operation is running while no Scepter is acquired
TString ErrorReason;
std::optional<TString> CurrentSelfAssemblyUUID;
+ bool ConfigsCollected = false;
+
+ // bridge-related logic
+ std::set<std::tuple<ui32, TGroupId, TBridgePileId>> WorkingSyncersByNode;
+ std::set<std::tuple<TGroupId, TBridgePileId, ui32>> WorkingSyncers;
+ bool SyncerArrangeInFlight = false;
+ bool SyncerArrangePending = false;
// subscribed IC sessions
struct TSessionSubscription {
@@ -369,6 +376,7 @@ namespace NKikimr::NStorage {
void CheckRootNodeStatus();
void BecomeRoot();
void UnbecomeRoot();
+ void CheckIfDone();
void HandleErrorTimeout();
void ProcessGather(TEvGather *res);
bool HasQuorum(const NKikimrBlobStorage::TStorageConfig& config) const;
@@ -410,6 +418,23 @@ namespace NKikimr::NStorage {
void SwitchToError(const TString& reason);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Bridge ops
+
+ // preparation (may be async)
+ void PrepareScatterTask(ui64 cookie, TScatterTask& task, const TEvScatter::TManageSyncers& request);
+ void Handle(TEvNodeWardenManageSyncersResult::TPtr ev);
+
+ // execute per-node action
+ void Perform(TEvGather::TManageSyncers *response, const TEvScatter::TManageSyncers& request, TScatterTask& task);
+
+ // handle gather result (when completed all along the cluster)
+ void ProcessManageSyncers(TEvGather::TManageSyncers *res);
+
+ void RearrangeSyncing();
+ void OnSyncerUnboundNode(ui32 nodeId);
+ void IssueQuerySyncers();
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Scatter/gather logic
void IssueScatterTask(std::optional<TActorId> actorId, TEvScatter&& request);
diff --git a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
index f0ca80520f2..21b596ed381 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_binding.cpp
@@ -564,6 +564,8 @@ namespace NKikimr::NStorage {
UnsubscribeQueue.insert(nodeId);
+ OnSyncerUnboundNode(nodeId);
+
CheckRootNodeStatus();
}
}
diff --git a/ydb/core/blobstorage/nodewarden/distconf_bridge.cpp b/ydb/core/blobstorage/nodewarden/distconf_bridge.cpp
new file mode 100644
index 00000000000..67d1982c3a0
--- /dev/null
+++ b/ydb/core/blobstorage/nodewarden/distconf_bridge.cpp
@@ -0,0 +1,202 @@
+#include "distconf.h"
+
+namespace NKikimr::NStorage {
+
+ void TDistributedConfigKeeper::PrepareScatterTask(ui64 cookie, TScatterTask& task, const TEvScatter::TManageSyncers& request) {
+ // issue query to node warden and wait until its completion
+ std::vector<TEvNodeWardenManageSyncers::TSyncer> runSyncers;
+ for (const auto& item : request.GetRunSyncers()) {
+ runSyncers.push_back({
+ .NodeId = item.GetNodeId(),
+ .GroupId = TGroupId::FromProto(&item, &NKikimrBlobStorage::TStorageSyncerInfo::GetGroupId),
+ .TargetBridgePileId = TBridgePileId::FromProto(&item, &NKikimrBlobStorage::TStorageSyncerInfo::GetTargetBridgePileId),
+ });
+ }
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), new TEvNodeWardenManageSyncers(std::move(runSyncers)), 0, cookie);
+ ++task.AsyncOperationsPending;
+ }
+
+ void TDistributedConfigKeeper::Handle(TEvNodeWardenManageSyncersResult::TPtr ev) {
+ if (auto it = ScatterTasks.find(ev->Cookie); it != ScatterTasks.end()) {
+ TScatterTask& task = it->second;
+ auto *response = task.Response.MutableManageSyncers();
+ auto *node = response->AddNodes();
+ node->SetNodeId(SelfId().NodeId());
+ for (const auto& workingSyncer : ev->Get()->WorkingSyncers) {
+ auto *item = node->AddSyncers();
+ workingSyncer.GroupId.CopyToProto(item, &NKikimrBlobStorage::TStorageSyncerInfo::SetGroupId);
+ workingSyncer.TargetBridgePileId.CopyToProto(item, &NKikimrBlobStorage::TStorageSyncerInfo::SetTargetBridgePileId);
+ }
+ }
+ FinishAsyncOperation(ev->Cookie);
+ }
+
+ void TDistributedConfigKeeper::Perform(TEvGather::TManageSyncers *response, const TEvScatter::TManageSyncers& /*request*/, TScatterTask& task) {
+ THashMap<ui32, TEvGather::TManageSyncers::TNode*> nodeMap;
+ for (size_t i = 0; i < response->NodesSize(); ++i) {
+ auto *node = response->MutableNodes(i);
+ nodeMap.emplace(node->GetNodeId(), node);
+ }
+ for (const auto& reply : task.CollectedResponses) {
+ for (const auto& node : reply.GetManageSyncers().GetNodes()) {
+ if (const auto it = nodeMap.find(node.GetNodeId()); it != nodeMap.end()) {
+ it->second->MergeFrom(node);
+ } else {
+ auto *newNode = response->AddNodes();
+ newNode->CopyFrom(node);
+ nodeMap.emplace(newNode->GetNodeId(), newNode);
+ }
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::ProcessManageSyncers(TEvGather::TManageSyncers *res) {
+ // actualize WorkingSyncers with received data
+ for (const auto& node : res->GetNodes()) {
+ const ui32 nodeId = node.GetNodeId();
+
+ // trim currently known working syncers for this node (as we received a bit freshier info)
+ const auto min = std::make_tuple(nodeId, TGroupId::Min(), TBridgePileId::Min());
+ const auto begin = WorkingSyncersByNode.lower_bound(min);
+ const auto max = std::make_tuple(nodeId, TGroupId::Max(), TBridgePileId::Max());
+ const auto end = WorkingSyncersByNode.upper_bound(max);
+ for (auto it = begin; it != end; ++it) {
+ const auto& [nodeId, groupId, targetBridgePileId] = *it;
+ const size_t num = WorkingSyncers.erase(std::make_tuple(groupId, targetBridgePileId, nodeId));
+ Y_ABORT_UNLESS(num == 1);
+ }
+ WorkingSyncersByNode.erase(begin, end);
+
+ // insert newly received syncers in the set
+ for (const auto& item : node.GetSyncers()) {
+ using T = std::decay_t<decltype(item)>;
+ const auto groupId = TGroupId::FromProto(&item, &T::GetGroupId);
+ const auto targetBridgePileId = TBridgePileId::FromProto(&item, &T::GetTargetBridgePileId);
+ const bool ins1 = WorkingSyncersByNode.emplace(nodeId, groupId, targetBridgePileId).second;
+ const bool ins2 = WorkingSyncers.emplace(groupId, targetBridgePileId, nodeId).second;
+ Y_ABORT_UNLESS(ins1 == ins2);
+ }
+ }
+
+ // apply changes
+ RearrangeSyncing();
+ }
+
+ void TDistributedConfigKeeper::RearrangeSyncing() {
+ // run new syncers, stop unneeded ones
+ TEvScatter task;
+ TEvScatter::TManageSyncers *manage = nullptr;
+
+ auto getManage = [&] {
+ if (!manage) {
+ manage = task.MutableManageSyncers();
+ task.SetTaskId(RandomNumber<ui64>());
+ }
+ return manage;
+ };
+
+ for (auto it = WorkingSyncers.begin(); it != WorkingSyncers.end(); ) {
+ const auto& [groupId, targetBridgePileId, nodeId] = *it;
+
+ // find an end to this sequence
+ size_t numItems = 0;
+ auto jt = it;
+ while (jt != WorkingSyncers.end() && std::get<0>(*jt) == groupId && std::get<1>(*jt) == targetBridgePileId) {
+ ++jt, ++numItems;
+ }
+
+ // we have to terminate excessive syncer(s) here
+ if (numItems > 1) {
+ auto *entry = getManage()->AddRunSyncers(); // explicitly let only single one remaining
+ const auto& [groupId, targetBridgePileId, nodeId] = *it;
+ entry->SetNodeId(nodeId);
+ groupId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetGroupId);
+ targetBridgePileId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetTargetBridgePileId);
+ }
+
+ // advance to next pair
+ it = jt;
+ }
+
+ std::vector<ui32> nodes;
+ auto prepareNodes = [&] {
+ if (nodes.empty()) {
+ // build list of all nodes expected to be working
+ nodes.push_back(SelfNode.NodeId());
+ for (const auto& [nodeId, info] : AllBoundNodes) {
+ nodes.push_back(nodeId.NodeId());
+ }
+ std::ranges::sort(nodes);
+ }
+ };
+
+ Y_ABORT_UNLESS(StorageConfig);
+ const auto& history = StorageConfig->GetClusterStateHistory();
+ for (const auto& item : history.GetPileSyncState()) {
+ const auto bridgePileId = TBridgePileId::FromProto(&item,
+ &NKikimrBridge::TClusterStateHistory::TPileSyncState::GetBridgePileId);
+
+ for (const auto& groupIdNum : item.GetUnsyncedGroupIds()) {
+ const auto groupId = TGroupId::FromValue(groupIdNum);
+
+ const auto key = std::make_tuple(groupId, bridgePileId, 0);
+ const auto maxKey = std::make_tuple(groupId, bridgePileId, Max<ui32>());
+ const auto it = WorkingSyncers.lower_bound(key);
+ if (it != WorkingSyncers.end() && *it <= maxKey) {
+ continue; // syncer already running
+ }
+
+ auto *entry = getManage()->AddRunSyncers();
+ const size_t hash = MultiHash(groupId, bridgePileId);
+ prepareNodes();
+ entry->SetNodeId(nodes[hash % nodes.size()]);
+ groupId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetGroupId);
+ bridgePileId.CopyToProto(entry, &NKikimrBlobStorage::TStorageSyncerInfo::SetTargetBridgePileId);
+ }
+ }
+
+ if (manage) {
+ IssueScatterTask(TActorId(), std::move(task));
+ } else {
+ Y_ABORT_UNLESS(SyncerArrangeInFlight);
+ SyncerArrangeInFlight = false;
+ if (std::exchange(SyncerArrangePending, false)) {
+ IssueQuerySyncers();
+ }
+ }
+ }
+
+ void TDistributedConfigKeeper::OnSyncerUnboundNode(ui32 nodeId) {
+ bool changes = false;
+
+ const auto min = std::make_tuple(nodeId, TGroupId::Min(), TBridgePileId::Min());
+ for (auto it = WorkingSyncersByNode.lower_bound(min); it != WorkingSyncersByNode.end() &&
+ std::get<0>(*it) == nodeId; it = WorkingSyncersByNode.erase(it)) {
+ const auto& [nodeId, groupId, targetBridgePileId] = *it;
+ const size_t num = WorkingSyncers.erase(std::make_tuple(groupId, targetBridgePileId, nodeId));
+ Y_ABORT_UNLESS(num == 1);
+ changes = true;
+ }
+
+ if (changes) {
+ RearrangeSyncing();
+ }
+ }
+
+ void TDistributedConfigKeeper::IssueQuerySyncers() {
+ if (!Cfg->BridgeConfig) {
+ return;
+ }
+ if (!SyncerArrangeInFlight) {
+ STLOG(PRI_DEBUG, BS_NODE, NWDC82, "Starting syncer collection", (Scepter, Scepter->Id), (RootState, RootState));
+ TEvScatter task;
+ task.SetTaskId(RandomNumber<ui64>());
+ task.MutableManageSyncers();
+ IssueScatterTask(TActorId(), std::move(task));
+ SyncerArrangeInFlight = true;
+ } else {
+ SyncerArrangePending = true;
+ }
+ }
+
+} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp b/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp
index 94bfdd03601..c4664da6548 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_connectivity.cpp
@@ -82,7 +82,7 @@ namespace NKikimr::NStorage {
if (nodeId <= AppData()->DynamicNameserviceConfig->MaxStaticNodeId) {
continue; // static nodes were already processed
}
- STLOG(PRI_DEBUG, BS_NODE, NWDCC00, "disconnecting dynamic", (PeerNodeId, nodeId),
+ STLOG(PRI_DEBUG, BS_NODE, NWDCC03, "disconnecting dynamic", (PeerNodeId, nodeId),
(BridgePileId, pile.BridgePileId));
as->Send(new IEventHandle(TEvInterconnect::EvDisconnect, 0, as->InterconnectProxy(nodeId),
{}, nullptr, 0));
diff --git a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
index 47d585a0dc6..b2c11433c24 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_fsm.cpp
@@ -26,23 +26,24 @@ namespace NKikimr::NStorage {
}
void TDistributedConfigKeeper::BecomeRoot() {
- auto makeAllBoundNodes = [&] {
- TStringStream s;
- const char *sep = "{";
- for (const auto& [nodeId, _] : AllBoundNodes) {
- s << std::exchange(sep, " ") << nodeId;
- }
- s << '}';
- return s.Str();
- };
- STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection", (Scepter, Scepter->Id),
- (AllBoundNodes, makeAllBoundNodes()));
- RootState = ERootState::IN_PROGRESS;
+ RootState = ERootState::IN_PROGRESS; // collecting configs at least
+
+ WorkingSyncersByNode.clear();
+ WorkingSyncers.clear();
+ SyncerArrangeInFlight = false;
+ SyncerArrangePending = false;
+
+ // start collecting configs from all bound nodes
+ STLOG(PRI_DEBUG, BS_NODE, NWDC19, "Starting config collection", (Scepter, Scepter->Id));
+ ConfigsCollected = false;
TEvScatter task;
task.SetTaskId(RandomNumber<ui64>());
task.MutableCollectConfigs();
IssueScatterTask(TActorId(), std::move(task));
+ // start collecting syncers state if needed
+ IssueQuerySyncers();
+
// establish connection to console tablet (if we have means to do it)
Y_ABORT_UNLESS(!ConsolePipeId);
ConnectToConsole();
@@ -52,6 +53,12 @@ namespace NKikimr::NStorage {
DisconnectFromConsole();
}
+ void TDistributedConfigKeeper::CheckIfDone() {
+ if (RootState == ERootState::IN_PROGRESS && ConfigsCollected) {
+ RootState = ERootState::RELAX;
+ }
+ }
+
void TDistributedConfigKeeper::SwitchToError(const TString& reason) {
STLOG(PRI_NOTICE, BS_NODE, NWDC38, "SwitchToError", (RootState, RootState), (Reason, reason));
if (Scepter) {
@@ -93,10 +100,14 @@ namespace NKikimr::NStorage {
if (auto error = ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) {
SwitchToError(*error);
} else {
- RootState = ERootState::RELAX;
+ ConfigsCollected = true;
+ CheckIfDone();
}
return;
+ case TEvGather::kManageSyncers:
+ return ProcessManageSyncers(res->MutableManageSyncers());
+
case TEvGather::RESPONSE_NOT_SET:
return SwitchToError("response not set");
}
@@ -118,7 +129,8 @@ namespace NKikimr::NStorage {
std::visit(TOverloaded{
[&](std::monostate&) {
STLOG(PRI_DEBUG, BS_NODE, NWDC61, "ProcessCollectConfigs: monostate");
- RootState = ERootState::RELAX;
+ ConfigsCollected = true;
+ CheckIfDone();
},
[&](TString& error) {
STLOG(PRI_DEBUG, BS_NODE, NWDC63, "ProcessCollectConfigs: error", (Error, error));
@@ -540,6 +552,10 @@ namespace NKikimr::NStorage {
}
break;
+ case TEvScatter::kManageSyncers:
+ PrepareScatterTask(cookie, task, task.Request.GetManageSyncers());
+ break;
+
case TEvScatter::REQUEST_NOT_SET:
break;
}
@@ -555,6 +571,10 @@ namespace NKikimr::NStorage {
Perform(task.Response.MutableProposeStorageConfig(), task.Request.GetProposeStorageConfig(), task);
break;
+ case TEvScatter::kManageSyncers:
+ Perform(task.Response.MutableManageSyncers(), task.Request.GetManageSyncers(), task);
+ break;
+
case TEvScatter::REQUEST_NOT_SET:
// unexpected case
break;
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke.h b/ydb/core/blobstorage/nodewarden/distconf_invoke.h
index e1be1c5f34b..2216b1390a0 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke.h
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke.h
@@ -15,6 +15,7 @@ namespace NKikimr::NStorage {
const TActorId RequestSessionId;
bool IsScepterlessOperation = false;
+ bool CheckSyncersAfterCommit = false;
TActorId ParentId;
ui32 WaitingReplyFromNode = 0;
@@ -129,6 +130,8 @@ namespace NKikimr::NStorage {
NKikimrBlobStorage::TStorageConfig GetSwitchBridgeNewConfig(const NKikimrBridge::TClusterState& newClusterState);
bool CheckSwitchBridgeCommand();
+ void NotifyBridgeSyncFinished(const TQuery::TNotifyBridgeSyncFinished& cmd);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Configuration proposition
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp
index aa1f7dcb822..a452dfc7a1d 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_bridge.cpp
@@ -32,7 +32,30 @@ namespace NKikimr::NStorage {
Y_ABORT_UNLESS(current.PerPileStateSize() == numPiles);
ui32 numDifferent = 0;
for (ui32 i = 0; i < numPiles; ++i) {
- numDifferent += current.GetPerPileState(i) != newClusterState.GetPerPileState(i);
+ const auto currentState = current.GetPerPileState(i);
+ const auto newState = newClusterState.GetPerPileState(i);
+ if (currentState != newState) {
+ ++numDifferent;
+ switch (newState) {
+ case NKikimrBridge::TClusterState::DISCONNECTED:
+ // valid transition from any state
+ break;
+
+ case NKikimrBridge::TClusterState::SYNCHRONIZED:
+ // invalid transition from any state
+ return FinishWithError(TResult::ERROR, "can't switch to SYNCHRONIZED directly");
+
+ case NKikimrBridge::TClusterState::NOT_SYNCHRONIZED:
+// if (currentState == NKikimrBridge::TClusterState::SYNCHRONIZED) {
+// return FinishWithError(TResult::ERROR, "invalid transition from SYNCHRONIZED to NOT_SYNCHRONIZED");
+// }
+ break;
+
+ case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MIN_SENTINEL_DO_NOT_USE_:
+ case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MAX_SENTINEL_DO_NOT_USE_:
+ Y_ABORT();
+ }
+ }
}
if (numDifferent > 1) {
return FinishWithError(TResult::ERROR, "too many state changes in new configuration");
@@ -51,7 +74,16 @@ namespace NKikimr::NStorage {
const NKikimrBridge::TClusterState& newClusterState) {
NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
config.SetGeneration(config.GetGeneration() + 1);
- config.MutableClusterState()->CopyFrom(newClusterState);
+ auto *clusterState = config.MutableClusterState();
+ size_t changedPileIndex = Max<size_t>();
+ for (size_t i = 0; i < clusterState->PerPileStateSize(); ++i) {
+ if (clusterState->GetPerPileState(i) != newClusterState.GetPerPileState(i)) {
+ Y_ABORT_UNLESS(changedPileIndex == Max<size_t>());
+ changedPileIndex = i;
+ }
+ }
+ Y_ABORT_UNLESS(changedPileIndex != Max<size_t>());
+ clusterState->CopyFrom(newClusterState);
auto *history = config.MutableClusterStateHistory();
auto *entry = history->AddUnsyncedEntries();
@@ -61,6 +93,59 @@ namespace NKikimr::NStorage {
entry->AddUnsyncedPiles(i); // all piles are unsynced by default
}
+ switch (clusterState->GetPerPileState(changedPileIndex)) {
+ case NKikimrBridge::TClusterState::DISCONNECTED:
+ // this pile is not disconnected, there is no reason to synchronize it anymore
+ for (size_t i = 0; i < history->PileSyncStateSize(); ++i) {
+ const auto& item = history->GetPileSyncState(i);
+ if (item.GetBridgePileId() != changedPileIndex) {
+ break;
+ }
+ history->MutablePileSyncState()->DeleteSubrange(i, 1);
+ break;
+ }
+ break;
+
+ case NKikimrBridge::TClusterState::NOT_SYNCHRONIZED: {
+ // we are coming into NOT_SYNCHRONIZED state; we have to build a list of all of our entities
+ NKikimrBridge::TClusterStateHistory::TPileSyncState *state = nullptr;
+ for (size_t i = 0; i < history->PileSyncStateSize(); ++i) {
+ state = history->MutablePileSyncState(i);
+ if (state->GetBridgePileId() == changedPileIndex) {
+ state->ClearUnsyncedGroupIds();
+ state->ClearUnsyncedBSC();
+ break;
+ } else {
+ state = nullptr;
+ }
+ }
+ if (!state) {
+ state = history->AddPileSyncState();
+ state->SetBridgePileId(changedPileIndex);
+ }
+ state->SetUnsyncedBSC(true);
+ if (config.HasBlobStorageConfig()) {
+ if (const auto& bsConfig = config.GetBlobStorageConfig(); bsConfig.HasServiceSet()) {
+ const auto& ss = bsConfig.GetServiceSet();
+ for (const auto& group : ss.GetGroups()) {
+ if (group.BridgeGroupIdsSize()) {
+ state->AddUnsyncedGroupIds(group.GetGroupID());
+ }
+ }
+ }
+ }
+ CheckSyncersAfterCommit = true;
+ break;
+ }
+
+ case NKikimrBridge::TClusterState::SYNCHRONIZED:
+ Y_ABORT("invalid transition");
+
+ case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MIN_SENTINEL_DO_NOT_USE_:
+ case NKikimrBridge::TClusterState_EPileState_TClusterState_EPileState_INT_MAX_SENTINEL_DO_NOT_USE_:
+ Y_ABORT();
+ }
+
return config;
}
@@ -77,4 +162,97 @@ namespace NKikimr::NStorage {
return Self->HasQuorum(GetSwitchBridgeNewConfig(record.GetSwitchBridgeClusterState().GetNewClusterState()));
}
+ void TInvokeRequestHandlerActor::NotifyBridgeSyncFinished(const TQuery::TNotifyBridgeSyncFinished& cmd) {
+ if (!RunCommonChecks()) {
+ return;
+ } else if (!Self->Cfg->BridgeConfig) {
+ return FinishWithError(TResult::ERROR, "Bridge mode is not enabled");
+ } else if (Self->Cfg->BridgeConfig->PilesSize() <= cmd.GetBridgePileId()) {
+ return FinishWithError(TResult::ERROR, "BridgePileId out of bounds");
+ } else if (Self->StorageConfig->GetClusterState().GetGeneration() != cmd.GetGeneration()) {
+ return FinishWithError(TResult::ERROR, "generation mismatch");
+ }
+
+ NKikimrBlobStorage::TStorageConfig config = *Self->StorageConfig;
+ config.SetGeneration(config.GetGeneration() + 1);
+
+ auto *clusterState = config.MutableClusterState();
+ auto *history = config.MutableClusterStateHistory();
+
+ size_t stateIndex;
+ NKikimrBridge::TClusterStateHistory::TPileSyncState *state = nullptr;
+ for (stateIndex = 0; stateIndex < history->PileSyncStateSize(); ++stateIndex) {
+ state = history->MutablePileSyncState(stateIndex);
+ if (state->GetBridgePileId() == cmd.GetBridgePileId()) {
+ break;
+ } else {
+ state = nullptr;
+ }
+ }
+ if (!state) {
+ return FinishWithError(TResult::ERROR, "unsynced pile not found");
+ }
+
+ switch (cmd.GetStatus()) {
+ case TQuery::TNotifyBridgeSyncFinished::Success:
+ if (cmd.HasBSC()) {
+ if (!cmd.GetBSC()) {
+ return FinishWithError(TResult::ERROR, "incorrect request");
+ }
+ state->SetUnsyncedBSC(false);
+ }
+ if (cmd.HasGroupId()) {
+ auto *groups = state->MutableUnsyncedGroupIds();
+ for (int i = 0; i < groups->size(); ++i) {
+ if (groups->at(i) == cmd.GetGroupId()) {
+ if (i != groups->size() - 1) {
+ groups->SwapElements(i, groups->size() - 1);
+ }
+ groups->RemoveLast();
+ break;
+ }
+ }
+ }
+ if (cmd.UnsyncedGroupIdsToAddSize()) {
+ const auto& v = cmd.GetUnsyncedGroupIdsToAdd();
+ state->MutableUnsyncedGroupIds()->Add(v.begin(), v.end());
+ }
+ if (state->UnsyncedGroupIdsSize()) {
+ auto *groups = state->MutableUnsyncedGroupIds();
+ std::ranges::sort(*groups);
+ const auto [first, last] = std::ranges::unique(*groups);
+ groups->erase(first, last);
+ }
+ if (!state->GetUnsyncedBSC() && !state->UnsyncedGroupIdsSize()) {
+ // fully synced, can switch to SYNCHRONIZED
+ history->MutablePileSyncState()->DeleteSubrange(stateIndex, 1);
+ clusterState->SetPerPileState(cmd.GetBridgePileId(), NKikimrBridge::TClusterState::SYNCHRONIZED);
+ }
+ break;
+
+ case TQuery::TNotifyBridgeSyncFinished::TransientError:
+ break;
+
+ case TQuery::TNotifyBridgeSyncFinished::PermanentError: {
+ bool found = false;
+ for (const auto& err : state->GetPermanentErrorReasons()) {
+ if (err == cmd.GetErrorReason()) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ state->AddPermanentErrorReasons(cmd.GetErrorReason());
+ }
+ break;
+ }
+
+ case NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_INT_MIN_SENTINEL_DO_NOT_USE_:
+ case NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_TEvNodeConfigInvokeOnRoot_TNotifyBridgeSyncFinished_EStatus_INT_MAX_SENTINEL_DO_NOT_USE_:
+ Y_ABORT();
+ }
+
+ StartProposition(&config);
+ }
+
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
index 0ff0d6b9852..aa0d145b80a 100644
--- a/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
+++ b/ydb/core/blobstorage/nodewarden/distconf_invoke_common.cpp
@@ -21,7 +21,7 @@ namespace NKikimr::NStorage {
void TInvokeRequestHandlerActor::Bootstrap(TActorId parentId) {
if (LifetimeToken.expired()) {
- return FinishWithError(TResult::ERROR, "distributed config keeper terminated");
+ return FinishWithError(TResult::RACE, "distributed config keeper terminated");
}
STLOG(PRI_DEBUG, BS_NODE, NWDC42, "TInvokeRequestHandlerActor::Bootstrap", (Sender, Sender), (Cookie, Cookie),
@@ -31,10 +31,10 @@ namespace NKikimr::NStorage {
Become(&TThis::StateFunc);
if (Self->ScepterlessOperationInProgress) {
- FinishWithError(TResult::ERROR, "an operation is already in progress");
+ FinishWithError(TResult::RACE, "an operation is already in progress");
} else if (Self->Binding) {
if (RequestSessionId) {
- FinishWithError(TResult::ERROR, "no double-hop invokes allowed");
+ FinishWithError(TResult::RACE, "no double-hop invokes allowed");
} else {
const ui32 root = Self->Binding->RootNodeId;
Send(MakeBlobStorageNodeWardenID(root), Event->Release(), IEventHandle::FlagSubscribeOnSession);
@@ -148,6 +148,9 @@ namespace NKikimr::NStorage {
case TQuery::kGetStateStorageConfig:
return GetStateStorageConfig();
+
+ case TQuery::kNotifyBridgeSyncFinished:
+ return NotifyBridgeSyncFinished(record.GetNotifyBridgeSyncFinished());
}
FinishWithError(TResult::ERROR, "unhandled request");
@@ -270,6 +273,9 @@ namespace NKikimr::NStorage {
if (auto error = Self->ProcessProposeStorageConfig(res->MutableProposeStorageConfig())) {
return error;
}
+ if (CheckSyncersAfterCommit) {
+ InvokeOtherActor(*Self, &TDistributedConfigKeeper::IssueQuerySyncers);
+ }
Finish(Sender, SelfId(), PrepareResult(TResult::OK, std::nullopt).release(), 0, Cookie);
return std::nullopt;
};
@@ -299,13 +305,13 @@ namespace NKikimr::NStorage {
if (!Self->StorageConfig) {
FinishWithError(TResult::ERROR, "no agreed StorageConfig");
} else if (Self->CurrentProposedStorageConfig) {
- FinishWithError(TResult::ERROR, "config proposition request in flight");
+ FinishWithError(TResult::RACE, "config proposition request in flight");
} else if (Self->RootState != (IsScepterlessOperation ? ERootState::INITIAL : ERootState::RELAX)) {
- FinishWithError(TResult::ERROR, "something going on with default FSM");
+ FinishWithError(TResult::RACE, "something going on with default FSM");
} else if (auto error = ValidateConfig(*Self->StorageConfig)) {
FinishWithError(TResult::ERROR, TStringBuilder() << "current config validation failed: " << *error);
} else if (IsScepterExpired()) {
- FinishWithError(TResult::ERROR, "scepter lost during query execution");
+ FinishWithError(TResult::RACE, "scepter lost during query execution");
} else {
return true;
}
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_events.h b/ydb/core/blobstorage/nodewarden/node_warden_events.h
index 7868396cc12..1dd02772552 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_events.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_events.h
@@ -167,7 +167,7 @@ namespace NKikimr::NStorage {
};
struct TEvNodeWardenUpdateConfigFromPeer
- : TEventLocal<TEvNodeWardenUpdateConfigFromPeer, TEvBlobStorage::EvNodeWardenUpdateConfigFromPeer>
+ : TEventLocal<TEvNodeWardenUpdateConfigFromPeer, TEvBlobStorage::EvNodeWardenUpdateConfigFromPeer>
{
NKikimrBlobStorage::TStorageConfig StorageConfig;
@@ -176,4 +176,33 @@ namespace NKikimr::NStorage {
{}
};
+ struct TEvNodeWardenManageSyncers
+ : TEventLocal<TEvNodeWardenManageSyncers, TEvBlobStorage::EvNodeWardenManageSyncers>
+ {
+ struct TSyncer {
+ ui32 NodeId;
+ TGroupId GroupId;
+ TBridgePileId TargetBridgePileId;
+ };
+ std::vector<TSyncer> RunSyncers;
+
+ TEvNodeWardenManageSyncers(std::vector<TSyncer>&& runSyncers)
+ : RunSyncers(std::move(runSyncers))
+ {}
+ };
+
+ struct TEvNodeWardenManageSyncersResult
+ : TEventLocal<TEvNodeWardenManageSyncersResult, TEvBlobStorage::EvNodeWardenManageSyncersResult>
+ {
+ struct TSyncer {
+ TGroupId GroupId;
+ TBridgePileId TargetBridgePileId;
+ };
+ std::vector<TSyncer> WorkingSyncers;
+
+ TEvNodeWardenManageSyncersResult(std::vector<TSyncer>&& workingSyncers)
+ : WorkingSyncers(std::move(workingSyncers))
+ {}
+ };
+
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
index 0a12d33a4ec..b42c740bdd0 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_group.cpp
@@ -2,6 +2,8 @@
#include "node_warden_impl.h"
#include "node_warden_events.h"
+#include <ydb/core/blobstorage/bridge/syncer/syncer.h>
+
#include <ydb/core/blob_depot/agent/agent.h>
#include <ydb/core/util/random.h>
@@ -238,6 +240,9 @@ namespace NKikimr::NStorage {
for (auto& vdisk : group.VDisksOfGroup) {
UpdateGroupInfoForDisk(vdisk, info);
}
+ for (const auto& [targetBridgePileId, actorId] : group.WorkingSyncers) {
+ Send(actorId, new TEvBlobStorage::TEvConfigureProxy(info, nullptr, nullptr));
+ }
}
if (const auto it = GroupPendingQueue.find(groupId); it != GroupPendingQueue.end()) {
@@ -318,4 +323,38 @@ namespace NKikimr::NStorage {
}
}
+ void TNodeWarden::HandleManageSyncers(TEvNodeWardenManageSyncers::TPtr ev) {
+ for (const auto& item : ev->Get()->RunSyncers) {
+ const ui32 groupId = item.GroupId.GetRawId();
+
+ if (item.NodeId == LocalNodeId) {
+ auto& group = Groups[groupId];
+ if (TActorId& actorId = group.WorkingSyncers[item.TargetBridgePileId]; !actorId) {
+ STLOG(PRI_DEBUG, BS_NODE, NW64, "starting syncer actor", (GroupId, item.GroupId),
+ (TargetBridgePileId, item.TargetBridgePileId));
+ actorId = Register(NBridge::CreateSyncerActor(NeedGroupInfo(groupId), item.TargetBridgePileId));
+ }
+ } else if (const auto it = Groups.find(groupId); it != Groups.end()) {
+ TGroupRecord& group = it->second;
+ if (const auto jt = group.WorkingSyncers.find(item.TargetBridgePileId); jt != group.WorkingSyncers.end()) {
+ STLOG(PRI_DEBUG, BS_NODE, NW64, "stopping syncer actor", (GroupId, item.GroupId),
+ (TargetBridgePileId, item.TargetBridgePileId));
+ TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, jt->second, SelfId(), nullptr, 0));
+ group.WorkingSyncers.erase(jt);
+ }
+ }
+ }
+
+ std::vector<TEvNodeWardenManageSyncersResult::TSyncer> workingSyncers;
+ for (const auto& [groupId, group] : Groups) {
+ for (const auto& [targetBridgePileId, actorId] : group.WorkingSyncers) {
+ workingSyncers.push_back({
+ .GroupId = TGroupId::FromValue(groupId),
+ .TargetBridgePileId = targetBridgePileId,
+ });
+ }
+ }
+ Send(ev->Sender, new TEvNodeWardenManageSyncersResult(std::move(workingSyncers)), 0, ev->Cookie);
+ }
+
} // NKikimr::NStorage
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index f7820eb3a36..d73b63d32b7 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -176,6 +176,8 @@ STATEFN(TNodeWarden::StateOnline) {
hFunc(TEvNodeWardenQueryCacheResult, Handle);
+ hFunc(TEvNodeWardenManageSyncers, HandleManageSyncers);
+
default:
EnqueuePendingMessage(ev);
break;
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 93f24586f93..9eacb0aa3a9 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -23,6 +23,7 @@ namespace NKikimr::NStorage {
struct TEvNodeWardenNotifyConfigMismatch;
struct TEvNodeWardenWriteMetadata;
struct TEvNodeWardenQueryCacheResult;
+ struct TEvNodeWardenManageSyncers;
constexpr ui32 ProxyConfigurationTimeoutMilliseconds = 200;
constexpr TDuration BackoffMin = TDuration::MilliSeconds(20);
@@ -526,6 +527,7 @@ namespace NKikimr::NStorage {
TActorId GroupResolver; // resolver actor id
TIntrusiveList<TVDiskRecord, TGroupRelationTag> VDisksOfGroup;
TNodeLayoutInfoPtr NodeLayoutInfo;
+ THashMap<TBridgePileId, TActorId> WorkingSyncers;
};
std::unordered_map<ui32, TGroupRecord> Groups;
@@ -630,6 +632,8 @@ namespace NKikimr::NStorage {
void Handle(NNodeWhiteboard::TEvWhiteboard::TEvBSGroupStateUpdate::TPtr ev);
+ void HandleManageSyncers(TAutoPtr<TEventHandle<TEvNodeWardenManageSyncers>> ev);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
TActorId DistributedConfigKeeperId;
diff --git a/ydb/core/blobstorage/nodewarden/ya.make b/ydb/core/blobstorage/nodewarden/ya.make
index f1d42dd2f80..bfa05005083 100644
--- a/ydb/core/blobstorage/nodewarden/ya.make
+++ b/ydb/core/blobstorage/nodewarden/ya.make
@@ -6,6 +6,7 @@ SRCS(
distconf.cpp
distconf.h
distconf_binding.cpp
+ distconf_bridge.cpp
distconf_cache.cpp
distconf_connectivity.cpp
distconf_console.cpp
@@ -45,6 +46,7 @@ PEERDIR(
library/cpp/openssl/crypto
ydb/core/base
ydb/core/blob_depot/agent
+ ydb/core/blobstorage/bridge/syncer
ydb/core/blobstorage/common
ydb/core/blobstorage/crypto
ydb/core/blobstorage/dsproxy/bridge
diff --git a/ydb/core/blobstorage/ya.make b/ydb/core/blobstorage/ya.make
index 807b6abe948..d0927adddfa 100644
--- a/ydb/core/blobstorage/ya.make
+++ b/ydb/core/blobstorage/ya.make
@@ -31,6 +31,7 @@ END()
RECURSE(
backpressure
base
+ bridge
crypto
dsproxy
groupinfo
diff --git a/ydb/core/mind/bscontroller/bsc.cpp b/ydb/core/mind/bscontroller/bsc.cpp
index f2c54c90382..36bbe96b256 100644
--- a/ydb/core/mind/bscontroller/bsc.cpp
+++ b/ydb/core/mind/bscontroller/bsc.cpp
@@ -316,11 +316,38 @@ void TBlobStorageController::ApplyBscSettings(const NKikimrConfig::TBlobStorageC
updateSettings->CopyFrom(bsConfig.GetBscSettings());
- STLOG(PRI_DEBUG, BS_CONTROLLER, BSC17, "ApplyBSCSettings", (Request, r));
+ STLOG(PRI_DEBUG, BS_CONTROLLER, BSC39, "ApplyBSCSettings", (Request, r));
Send(SelfId(), ev.release());
}
void TBlobStorageController::ApplyStorageConfig(bool ignoreDistconf) {
+ InvokeOnRootTimer.Reset();
+ InvokeOnRootCmd.reset();
+
+ if (StorageConfig->HasClusterStateHistory()) {
+ const auto& history = StorageConfig->GetClusterStateHistory();
+ for (const auto& unsynced : history.GetPileSyncState()) {
+ if (unsynced.GetUnsyncedBSC()) {
+ auto ev = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>();
+ auto& record = ev->Record;
+ auto *nbsf = record.MutableNotifyBridgeSyncFinished();
+ nbsf->SetGeneration(StorageConfig->GetClusterState().GetGeneration());
+ nbsf->SetBridgePileId(unsynced.GetBridgePileId());
+ using TQuery = NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot::TNotifyBridgeSyncFinished;
+ nbsf->SetStatus(TQuery::Success);
+ nbsf->SetBSC(true);
+ for (const auto& [groupId, info] : GroupMap) {
+ if (info->BridgeGroupInfo) {
+ groupId.CopyToProto(nbsf, &TQuery::AddUnsyncedGroupIdsToAdd);
+ }
+ }
+ // remember the command in case it fails
+ InvokeOnRootCmd.emplace(record);
+ Send(MakeBlobStorageNodeWardenID(SelfId().NodeId()), ev.release());
+ }
+ }
+ }
+
if (!StorageConfig->HasBlobStorageConfig()) {
return;
}
@@ -426,6 +453,24 @@ void TBlobStorageController::ApplyStorageConfig(bool ignoreDistconf) {
}
}
+void TBlobStorageController::Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr ev) {
+ const auto status = ev->Get()->Record.GetStatus();
+ const bool success = status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::OK;
+ const bool retriable =
+ status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::RACE ||
+ status == NKikimrBlobStorage::TEvNodeConfigInvokeOnRootResult::NO_QUORUM;
+ STLOG(retriable ? PRI_INFO : success ? PRI_DEBUG : PRI_WARN, BS_CONTROLLER, BSC38, "TEvNodeConfigInvokeOnRootResult",
+ (Response, ev->Get()->Record), (Success, success), (Retriable, retriable));
+ if (success) {
+ InvokeOnRootCmd.reset();
+ } else if (retriable && InvokeOnRootCmd) {
+ auto ev = std::make_unique<NStorage::TEvNodeConfigInvokeOnRoot>();
+ ev->Record.CopyFrom(*InvokeOnRootCmd);
+ TActivationContext::Schedule(TDuration::MilliSeconds(InvokeOnRootTimer.NextBackoffMs()), new IEventHandle(
+ MakeBlobStorageNodeWardenID(SelfId().NodeId()), SelfId(), ev.release()));
+ }
+}
+
void TBlobStorageController::Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev) {
auto& record = ev->Get()->Record;
auto& response = record.GetResponse();
@@ -699,6 +744,7 @@ STFUNC(TBlobStorageController::StateWork) {
fFunc(TEvBlobStorage::EvControllerShredRequest, EnqueueIncomingEvent);
cFunc(TEvPrivate::EvUpdateShredState, ShredState.HandleUpdateShredState);
cFunc(TEvPrivate::EvCommitMetrics, CommitMetrics);
+ hFunc(NStorage::TEvNodeConfigInvokeOnRootResult, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
STLOG(PRI_ERROR, BS_CONTROLLER, BSC06, "StateWork unexpected event", (Type, type),
diff --git a/ydb/core/mind/bscontroller/impl.h b/ydb/core/mind/bscontroller/impl.h
index b45888a60eb..e5e1d1a172f 100644
--- a/ydb/core/mind/bscontroller/impl.h
+++ b/ydb/core/mind/bscontroller/impl.h
@@ -11,6 +11,8 @@
#include "self_heal.h"
#include "storage_pool_stat.h"
+#include <ydb/core/blobstorage/nodewarden/node_warden_events.h>
+
#include <util/generic/hash_multi_map.h>
inline IOutputStream& operator <<(IOutputStream& o, NKikimr::TErasureType::EErasureSpecies p) {
@@ -1639,6 +1641,9 @@ private:
class TConsoleInteraction;
std::unique_ptr<TConsoleInteraction> ConsoleInteraction;
+ TBackoffTimer InvokeOnRootTimer{10, 3000};
+ std::optional<NKikimrBlobStorage::TEvNodeConfigInvokeOnRoot> InvokeOnRootCmd;
+
struct TEvPrivate {
enum EEv {
EvUpdateSystemViews = EventSpaceBegin(TEvents::ES_PRIVATE),
@@ -1850,6 +1855,7 @@ private:
bool HostConfigEquals(const THostConfigInfo& left, const NKikimrBlobStorage::TDefineHostConfig& right) const;
void ApplyBscSettings(const NKikimrConfig::TBlobStorageConfig& bsConfig);
void ApplyStorageConfig(bool ignoreDistconf = false);
+ void Handle(NStorage::TEvNodeConfigInvokeOnRootResult::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerConfigResponse::TPtr ev);
void Handle(TEvBlobStorage::TEvControllerDistconfRequest::TPtr ev);
diff --git a/ydb/core/mind/bscontroller/register_node.cpp b/ydb/core/mind/bscontroller/register_node.cpp
index 360eadc5aa7..7f9bbe66a4a 100644
--- a/ydb/core/mind/bscontroller/register_node.cpp
+++ b/ydb/core/mind/bscontroller/register_node.cpp
@@ -383,12 +383,12 @@ public:
if (!record.HasStorageConfigVersion() || record.GetStorageConfigVersion() < configVersion) {
yamlConfig->SetCompressedStorageConfig(CompressStorageYamlConfig(*Self->StorageYamlConfig));
} else if (configVersion < record.GetStorageConfigVersion()) {
- STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN09, "storage config version on node is greater than one known to BSC",
+ STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN10, "storage config version on node is greater than one known to BSC",
(NodeId, record.GetNodeID()),
(NodeVersion, record.GetMainConfigVersion()),
(StoredVersion, configVersion));
} else if (record.GetStorageConfigHash() != Self->StorageYamlConfigHash) {
- STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN11, "node storage config hash mismatch",
+ STLOG(PRI_ALERT, BS_CONTROLLER, BSCTXRN12, "node storage config hash mismatch",
(NodeId, record.GetNodeID()));
}
}
diff --git a/ydb/core/mind/bscontroller/shred.cpp b/ydb/core/mind/bscontroller/shred.cpp
index 243aadef31c..b8457281b0a 100644
--- a/ydb/core/mind/bscontroller/shred.cpp
+++ b/ydb/core/mind/bscontroller/shred.cpp
@@ -33,7 +33,7 @@ namespace NKikimr::NBsController {
{}
void Handle(TEvBlobStorage::TEvControllerShredRequest::TPtr ev) {
- STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "received TEvControllerShredRequest", (Record, ev->Get()->Record));
+ STLOG(PRI_DEBUG, BS_SHRED, BSSC00, "received TEvControllerShredRequest", (Record, ev->Get()->Record));
Self->Execute(new TTxUpdateShred(this, ev));
}
@@ -102,7 +102,7 @@ namespace NKikimr::NBsController {
}
for (auto& [nodeId, ev] : outbox) {
- STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "issuing TEvControllerNodeServiceSetUpdate", (NodeId, nodeId),
+ STLOG(PRI_DEBUG, BS_SHRED, BSSC01, "issuing TEvControllerNodeServiceSetUpdate", (NodeId, nodeId),
(Record, ev->Record));
Self->Send(MakeBlobStorageNodeWardenID(nodeId), ev.release());
}
@@ -123,7 +123,7 @@ namespace NKikimr::NBsController {
}
void OnShredFinished(TPDiskId pdiskId, TPDiskInfo& pdiskInfo, ui64 generation, TTransactionContext& txc) {
- STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "shred finished", (PDiskId, pdiskId), (Generation, generation),
+ STLOG(PRI_DEBUG, BS_SHRED, BSSC02, "shred finished", (PDiskId, pdiskId), (Generation, generation),
(ShredInProgress, pdiskInfo.ShredInProgress), (ShredComplete, pdiskInfo.ShredComplete),
(CurrentGeneration, GetCurrentGeneration()));
@@ -151,7 +151,7 @@ namespace NKikimr::NBsController {
}
void OnShredAborted(TPDiskId pdiskId, TPDiskInfo& pdiskInfo) {
- STLOG(PRI_DEBUG, BS_SHRED, BSSCxx, "shred aborted", (PDiskId, pdiskId),
+ STLOG(PRI_DEBUG, BS_SHRED, BSSC03, "shred aborted", (PDiskId, pdiskId),
(ShredInProgress, pdiskInfo.ShredInProgress), (ShredComplete, pdiskInfo.ShredComplete));
EndShredForPDisk(pdiskId, pdiskInfo);
diff --git a/ydb/core/protos/blobstorage_distributed_config.proto b/ydb/core/protos/blobstorage_distributed_config.proto
index 156fb6a5aa2..90fbe128f88 100644
--- a/ydb/core/protos/blobstorage_distributed_config.proto
+++ b/ydb/core/protos/blobstorage_distributed_config.proto
@@ -93,6 +93,12 @@ message TEvNodeConfigReversePush {
message TEvNodeConfigUnbind {
}
+message TStorageSyncerInfo {
+ uint32 NodeId = 1; // a node that contains running blobstorage syncer actor
+ uint32 GroupId = 2; // bridged group id
+ uint32 TargetBridgePileId = 3; // a target bridge pile sync goes for
+}
+
// Propagate query to the tree bottom and collect replies.
message TEvNodeConfigScatter {
message TCollectConfigs {
@@ -102,12 +108,17 @@ message TEvNodeConfigScatter {
TStorageConfig Config = 1;
}
+ message TManageSyncers {
+ repeated TStorageSyncerInfo RunSyncers = 1; // syncers expected to be running (suggests stopping the same ones on other nodes)
+ }
+
optional uint64 Cookie = 1;
optional fixed64 TaskId = 4; // random (and not necessarily unique) identifier
oneof Request {
TCollectConfigs CollectConfigs = 2;
TProposeStorageConfig ProposeStorageConfig = 3;
+ TManageSyncers ManageSyncers = 5;
}
}
@@ -158,12 +169,21 @@ message TEvNodeConfigGather {
repeated TStatus Status = 1;
}
+ message TManageSyncers {
+ message TNode {
+ uint32 NodeId = 1;
+ repeated TStorageSyncerInfo Syncers = 2;
+ }
+ repeated TNode Nodes = 1;
+ }
+
optional uint64 Cookie = 1;
optional bool Aborted = 4;
oneof Response {
TCollectConfigs CollectConfigs = 2;
TProposeStorageConfig ProposeStorageConfig = 3;
+ TManageSyncers ManageSyncers = 5;
}
}
@@ -244,6 +264,23 @@ message TEvNodeConfigInvokeOnRoot {
NKikimrBridge.TClusterState NewClusterState = 1;
}
+ message TNotifyBridgeSyncFinished {
+ enum EStatus {
+ Success = 0;
+ TransientError = 1;
+ PermanentError = 2;
+ }
+ uint64 Generation = 1; // generation matching ClusterState's one
+ uint32 BridgePileId = 2; // which bridge pile is marked synchronized
+ EStatus Status = 3; // status of this synchronization
+ optional string ErrorReason = 4; // textual description of an error
+ oneof Entity {
+ uint32 GroupId = 5;
+ bool BSC = 6;
+ }
+ repeated uint32 UnsyncedGroupIdsToAdd = 7;
+ }
+
oneof Request {
TUpdateConfig UpdateConfig = 1;
TQueryConfig QueryConfig = 2;
@@ -258,6 +295,7 @@ message TEvNodeConfigInvokeOnRoot {
TSwitchBridgeClusterState SwitchBridgeClusterState = 11;
TStateStorageConfig ReconfigStateStorage = 12;
TGetStateStorageConfig GetStateStorageConfig = 13;
+ TNotifyBridgeSyncFinished NotifyBridgeSyncFinished = 14;
}
}
diff --git a/ydb/core/protos/bridge.proto b/ydb/core/protos/bridge.proto
index 4182f5a8644..90e56c0d8bc 100644
--- a/ydb/core/protos/bridge.proto
+++ b/ydb/core/protos/bridge.proto
@@ -22,4 +22,12 @@ message TClusterStateHistory {
repeated uint32 UnsyncedPiles = 3; // a list of pile ids that did not refer to this entry; trimmed when empty
}
repeated TUnsyncedEntry UnsyncedEntries = 1; // entries stored in order of ascending Generation
+
+ message TPileSyncState {
+ uint32 BridgePileId = 1;
+ repeated string PermanentErrorReasons = 2; // a set of detected permanent errors for this pile
+ repeated uint32 UnsyncedGroupIds = 3; // a set of unsynced groups for this pile
+ bool UnsyncedBSC = 4; // true if BSC is not synced with this configuration yet
+ }
+ repeated TPileSyncState PileSyncState = 2; // per-pile sync state for current generation
}
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index d3cc385c4b6..d59c5d3b1bd 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -58,6 +58,7 @@ enum EServiceKikimr {
BS_PROXY_CHECKINTEGRITY = 2603;
BS_PROXY_BRIDGE = 2604;
BS_CLUSTER_BALANCING = 2605;
+ BS_BRIDGE_SYNC = 2606;
// DATASHARD section //
TX_DATASHARD = 290; //