aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Dmitriev <monster@ydb.tech>2025-03-14 17:54:01 +0300
committerGitHub <noreply@github.com>2025-03-14 17:54:01 +0300
commit75a742da98209e759a27cae4fcd509f534d6fb6b (patch)
tree0da6249feadd1bd8449516c53ea5eae6cff3e4c6
parent53b8065b350b34973e07df34a4967ec44cdacf11 (diff)
downloadydb-75a742da98209e759a27cae4fcd509f534d6fb6b.tar.gz
introduce VDisk syncronization broker (#15710)
-rw-r--r--ydb/core/base/blobstorage.h3
-rw-r--r--ydb/core/base/services/blobstorage_service_id.h5
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp138
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h30
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp51
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/ya.make2
-rw-r--r--ydb/core/blobstorage/vdisk/vdisk_services.h2
-rw-r--r--ydb/library/services/services.proto1
9 files changed, 222 insertions, 13 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 1832c591495..295252cdd29 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -757,6 +757,9 @@ struct TEvBlobStorage {
EvHugeQueryForbiddenChunks,
EvHugeForbiddenChunks,
EvContinueShred,
+ EvQuerySyncToken,
+ EvSyncToken,
+ EvReleaseSyncToken,
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
diff --git a/ydb/core/base/services/blobstorage_service_id.h b/ydb/core/base/services/blobstorage_service_id.h
index 7ca0ee1a0dd..f3ca9f02db9 100644
--- a/ydb/core/base/services/blobstorage_service_id.h
+++ b/ydb/core/base/services/blobstorage_service_id.h
@@ -76,6 +76,11 @@ inline TActorId MakeBlobStorageReplBrokerID() {
return TActorId(0, TStringBuf(x, 12));
}
+inline TActorId MakeBlobStorageSyncBrokerID() {
+ char x[12] = {'b', 's', 's', 'y', 'n', 'c', 'b', 'r', 'o', 'k', 'e', 'r'};
+ return TActorId(0, TStringBuf(x, 12));
+}
+
inline TActorId MakeBlobStorageNodeWardenID(ui32 node) {
char x[12] = {'b','s','n','o','d','e','c','n','t','r','l','r'};
return TActorId(node, TStringBuf(x, 12));
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 6fe9683e99e..80919b9f7f5 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.h>
#include <ydb/core/blobstorage/pdisk/drivedata_serializer.h>
#include <ydb/core/blobstorage/vdisk/repl/blobstorage_replbroker.h>
+#include <ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h>
#include <ydb/library/pdisk_io/file_params.h>
#include <ydb/core/mind/bscontroller/yaml_config_helpers.h>
#include <ydb/core/base/nameservice.h>
@@ -424,6 +425,8 @@ void TNodeWarden::Bootstrap() {
const ui64 maxBytes = replBrokerConfig.GetMaxInFlightReadBytes();
actorSystem->RegisterLocalService(MakeBlobStorageReplBrokerID(), Register(CreateReplBrokerActor(maxBytes)));
+ actorSystem->RegisterLocalService(MakeBlobStorageSyncBrokerID(), Register(CreateSyncBrokerActor()));
+
// determine if we are running in 'mock' mode
EnableProxyMock = Cfg->BlobStorageConfig.GetServiceSet().GetEnableProxyMock();
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp
new file mode 100644
index 00000000000..4a4801c799b
--- /dev/null
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp
@@ -0,0 +1,138 @@
+#include "blobstorage_syncer_broker.h"
+
+#include <ydb/core/blobstorage/vdisk/common/vdisk_log.h>
+
+namespace NKikimr {
+
+ class TSyncBroker : public TActor<TSyncBroker> {
+ static constexpr size_t ACTIVE_SYNC_LIMIT = 8;
+
+ std::unordered_map<TActorId, std::unordered_set<TActorId>> Active;
+
+ struct TWaitSync {
+ TActorId VDiskActorId;
+ std::unordered_set<TActorId> ActorIds;
+ };
+ std::list<TWaitSync> WaitQueue; // TODO: better search
+
+ public:
+ static constexpr auto ActorActivityType() {
+ return NKikimrServices::TActivity::BS_SYNC_BROKER;
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvQuerySyncToken, Handle)
+ hFunc(TEvReleaseSyncToken, Handle)
+ )
+
+ TSyncBroker()
+ : TActor(&TSyncBroker::StateFunc)
+ {}
+
+ void Handle(TEvQuerySyncToken::TPtr& ev) {
+ const auto vDiskActorId = ev->Get()->VDiskActorId;
+ const auto actorId = ev->Sender;
+
+ if (const auto it = Active.find(vDiskActorId); it != Active.end()) {
+ it->second.insert(actorId);
+ Send(actorId, new TEvSyncToken);
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
+ "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
+ ", actor id: " << actorId <<
+ ", token sent, active: " << Active.size() <<
+ ", waiting: " << WaitQueue.size());
+ return;
+ }
+
+ if (Active.size() < ACTIVE_SYNC_LIMIT) {
+ Active[vDiskActorId].insert(actorId);
+ Send(actorId, new TEvSyncToken);
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
+ "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
+ ", actor id: " << actorId <<
+ ", token sent, active: " << Active.size() <<
+ ", waiting: " << WaitQueue.size());
+ return;
+ }
+
+ auto pred = [&vDiskActorId](const auto& item) {
+ return item.VDiskActorId == vDiskActorId;
+ };
+
+ if (const auto it = std::find_if(WaitQueue.begin(), WaitQueue.end(), pred); it != WaitQueue.end()) {
+ it->ActorIds.insert(actorId);
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
+ "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
+ ", actor id: " << actorId <<
+ ", enqueued, active: " << Active.size() <<
+ ", waiting: " << WaitQueue.size());
+ return;
+ }
+
+ TWaitSync sync{vDiskActorId, {actorId}};
+ WaitQueue.emplace_back(std::move(sync));
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
+ "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId <<
+ ", actor id: " << actorId <<
+ ", enqueued, active: " << Active.size() <<
+ ", waiting: " << WaitQueue.size());
+ }
+
+ void ProcessQueue() {
+ while (!WaitQueue.empty() && Active.size() < ACTIVE_SYNC_LIMIT) {
+ const auto& waitSync = WaitQueue.front();
+ for (const auto& actorId : waitSync.ActorIds) {
+ Send(actorId, new TEvSyncToken);
+ }
+ Active[waitSync.VDiskActorId] = std::move(waitSync.ActorIds);
+ WaitQueue.pop_front();
+ }
+ }
+
+ void Handle(TEvReleaseSyncToken::TPtr& ev) {
+ const auto vDiskActorId = ev->Get()->VDiskActorId;
+ const auto actorId = ev->Sender;
+
+ if (const auto it = Active.find(vDiskActorId); it != Active.end()) {
+ it->second.erase(actorId);
+ if (it->second.empty()) {
+ Active.erase(vDiskActorId);
+ ProcessQueue();
+ }
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
+ "TEvReleaseSyncToken, VDisk actor id: " << vDiskActorId <<
+ ", actor id: " << actorId <<
+ ", token released, active: " << Active.size() <<
+ ", waiting: " << WaitQueue.size());
+ return;
+ }
+
+ auto pred = [&vDiskActorId](const auto& item) {
+ return item.VDiskActorId == vDiskActorId;
+ };
+
+ if (const auto it = std::find_if(WaitQueue.begin(), WaitQueue.end(), pred); it != WaitQueue.end()) {
+ it->ActorIds.erase(actorId);
+ if (it->ActorIds.empty()) {
+ WaitQueue.erase(it);
+ }
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER,
+ "TEvReleaseSyncToken, VDisk actor id: " << vDiskActorId <<
+ ", actor id: " << actorId <<
+ ", removed from queue, active: " << Active.size() <<
+ ", waiting: " << WaitQueue.size());
+ }
+ }
+ };
+
+ IActor *CreateSyncBrokerActor() {
+ return new TSyncBroker;
+ }
+
+} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h
new file mode 100644
index 00000000000..9e1995e280f
--- /dev/null
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "defs.h"
+#include <ydb/core/base/blobstorage.h>
+#include <ydb/core/blobstorage/base/blobstorage_vdiskid.h>
+
+namespace NKikimr {
+
+ struct TEvQuerySyncToken : public TEventLocal<TEvQuerySyncToken, TEvBlobStorage::EvQuerySyncToken> {
+ TActorId VDiskActorId;
+
+ explicit TEvQuerySyncToken(const TActorId& id)
+ : VDiskActorId(id)
+ {}
+ };
+
+ struct TEvSyncToken : public TEventLocal<TEvSyncToken, TEvBlobStorage::EvSyncToken>
+ {};
+
+ struct TEvReleaseSyncToken : public TEventLocal<TEvReleaseSyncToken, TEvBlobStorage::EvReleaseSyncToken> {
+ TActorId VDiskActorId;
+
+ explicit TEvReleaseSyncToken(const TActorId& id)
+ : VDiskActorId(id)
+ {}
+ };
+
+ extern IActor *CreateSyncBrokerActor();
+
+} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp
index 7a9454ced70..871171b52ff 100644
--- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp
@@ -1,4 +1,5 @@
#include "blobstorage_syncer_recoverlostdata_proxy.h"
+#include "blobstorage_syncer_broker.h"
#include "blobstorage_syncer_committer.h"
#include "blobstorage_syncer_data.h"
#include "blobstorage_syncquorum.h"
@@ -45,6 +46,39 @@ namespace NKikimr {
TVDiskID TargetVDiskId;
TActorId TargetActorId;
+ void Bootstrap(const TActorContext &ctx) {
+ LOG_DEBUG(ctx, BS_SYNCER,
+ VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
+ "TSyncerRLDFullSyncProxyActor(%s): START",
+ TargetVDiskId.ToString().data()));
+ ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvQuerySyncToken(SyncerCtx->VCtx->VDiskActorId),
+ IEventHandle::FlagTrackDelivery);
+ Become(&TThis::WaitForBrokerStateFunc);
+ }
+
+ void Handle(TEvSyncToken::TPtr &ev, const TActorContext &ctx) {
+ Y_UNUSED(ev);
+ LOG_DEBUG(ctx, BS_SYNCER,
+ VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
+ "TSyncerRLDFullSyncProxyActor(%s): TEvSyncToken received",
+ TargetVDiskId.ToString().data()));
+ CreateAndRunTask(ctx);
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) {
+ // no sync broker service
+ if (ev->Get()->SourceType == TEvQuerySyncToken::EventType) {
+ CreateAndRunTask(ctx);
+ }
+ }
+
+ STRICT_STFUNC(WaitForBrokerStateFunc,
+ HFunc(TEvents::TEvPoisonPill, HandlePoison)
+ HFunc(TEvSyncToken, Handle)
+ HFunc(TEvents::TEvUndelivered, Handle)
+ HFunc(TEvVGenerationChange, Handle)
+ )
+
void CreateAndRunTask(const TActorContext &ctx) {
// create task
auto task = std::make_unique<TSyncerJobTask>(TSyncerJobTask::EFullRecover, TargetVDiskId, TargetActorId,
@@ -56,17 +90,8 @@ namespace NKikimr {
Become(&TThis::WaitForSyncStateFunc);
}
- void Bootstrap(const TActorContext &ctx) {
- LOG_DEBUG(ctx, BS_SYNCER,
- VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
- "TSyncerRLDFullSyncProxyActor(%s): START",
- TargetVDiskId.ToString().data()));
-
- // run job
- CreateAndRunTask(ctx);
- }
-
void Handle(TEvSyncerJobDone::TPtr &ev, const TActorContext &ctx) {
+ ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvReleaseSyncToken(SyncerCtx->VCtx->VDiskActorId));
LOG_DEBUG(ctx, BS_SYNCER,
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
"TSyncerRLDFullSyncProxyActor(%s): TEvSyncerJobDone; Task# %s",
@@ -103,8 +128,9 @@ namespace NKikimr {
void Handle(TEvSyncerRLDWakeup::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev);
- // run job again
- CreateAndRunTask(ctx);
+ ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvQuerySyncToken(SyncerCtx->VCtx->VDiskActorId),
+ IEventHandle::FlagTrackDelivery);
+ Become(&TThis::WaitForBrokerStateFunc);
}
STRICT_STFUNC(WaitForTimeoutStateFunc,
@@ -147,6 +173,7 @@ namespace NKikimr {
// HandlePoison
////////////////////////////////////////////////////////////////////////
void HandlePoison(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) {
+ ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvReleaseSyncToken(SyncerCtx->VCtx->VDiskActorId));
LOG_DEBUG(ctx, BS_SYNCER,
VDISKP(SyncerCtx->VCtx->VDiskLogPrefix,
"TSyncerRLDFullSyncProxyActor(%s): PoisonPill",
diff --git a/ydb/core/blobstorage/vdisk/syncer/ya.make b/ydb/core/blobstorage/vdisk/syncer/ya.make
index fae14c75832..56746d96e5b 100644
--- a/ydb/core/blobstorage/vdisk/syncer/ya.make
+++ b/ydb/core/blobstorage/vdisk/syncer/ya.make
@@ -15,6 +15,8 @@ PEERDIR(
SRCS(
defs.h
+ blobstorage_syncer_broker.cpp
+ blobstorage_syncer_broker.h
blobstorage_syncer_committer.cpp
blobstorage_syncer_committer.h
blobstorage_syncer.cpp
diff --git a/ydb/core/blobstorage/vdisk/vdisk_services.h b/ydb/core/blobstorage/vdisk/vdisk_services.h
index 8f4ad3a98cd..7cb2f6f00ea 100644
--- a/ydb/core/blobstorage/vdisk/vdisk_services.h
+++ b/ydb/core/blobstorage/vdisk/vdisk_services.h
@@ -7,6 +7,6 @@ namespace NKikimr {
// Some services to run on a node for correct functionalily of VDisks
extern IActor *CreateReplBrokerActor(ui64 maxMemBytes);
+ extern IActor *CreateSyncBrokerActor();
} // NKikimr
-
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index bf3a85d80e7..15d6849f0ce 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -1084,5 +1084,6 @@ message TActivity {
REPLICATION_TRANSFER_WRITER = 657;
DATA_ERASURE = 658;
TENANT_DATA_ERASURE = 659;
+ BS_SYNC_BROKER = 660;
};
};