diff options
author | Aleksandr Dmitriev <monster@ydb.tech> | 2025-03-14 17:54:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-14 17:54:01 +0300 |
commit | 75a742da98209e759a27cae4fcd509f534d6fb6b (patch) | |
tree | 0da6249feadd1bd8449516c53ea5eae6cff3e4c6 | |
parent | 53b8065b350b34973e07df34a4967ec44cdacf11 (diff) | |
download | ydb-75a742da98209e759a27cae4fcd509f534d6fb6b.tar.gz |
introduce VDisk syncronization broker (#15710)
9 files changed, 222 insertions, 13 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 1832c591495..295252cdd29 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -757,6 +757,9 @@ struct TEvBlobStorage { EvHugeQueryForbiddenChunks, EvHugeForbiddenChunks, EvContinueShred, + EvQuerySyncToken, + EvSyncToken, + EvReleaseSyncToken, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, diff --git a/ydb/core/base/services/blobstorage_service_id.h b/ydb/core/base/services/blobstorage_service_id.h index 7ca0ee1a0dd..f3ca9f02db9 100644 --- a/ydb/core/base/services/blobstorage_service_id.h +++ b/ydb/core/base/services/blobstorage_service_id.h @@ -76,6 +76,11 @@ inline TActorId MakeBlobStorageReplBrokerID() { return TActorId(0, TStringBuf(x, 12)); } +inline TActorId MakeBlobStorageSyncBrokerID() { + char x[12] = {'b', 's', 's', 'y', 'n', 'c', 'b', 'r', 'o', 'k', 'e', 'r'}; + return TActorId(0, TStringBuf(x, 12)); +} + inline TActorId MakeBlobStorageNodeWardenID(ui32 node) { char x[12] = {'b','s','n','o','d','e','c','n','t','r','l','r'}; return TActorId(node, TStringBuf(x, 12)); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 6fe9683e99e..80919b9f7f5 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -10,6 +10,7 @@ #include <ydb/core/blobstorage/dsproxy/dsproxy_nodemonactor.h> #include <ydb/core/blobstorage/pdisk/drivedata_serializer.h> #include <ydb/core/blobstorage/vdisk/repl/blobstorage_replbroker.h> +#include <ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h> #include <ydb/library/pdisk_io/file_params.h> #include <ydb/core/mind/bscontroller/yaml_config_helpers.h> #include <ydb/core/base/nameservice.h> @@ -424,6 +425,8 @@ void TNodeWarden::Bootstrap() { const ui64 maxBytes = replBrokerConfig.GetMaxInFlightReadBytes(); actorSystem->RegisterLocalService(MakeBlobStorageReplBrokerID(), Register(CreateReplBrokerActor(maxBytes))); + actorSystem->RegisterLocalService(MakeBlobStorageSyncBrokerID(), Register(CreateSyncBrokerActor())); + // determine if we are running in 'mock' mode EnableProxyMock = Cfg->BlobStorageConfig.GetServiceSet().GetEnableProxyMock(); diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp new file mode 100644 index 00000000000..4a4801c799b --- /dev/null +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.cpp @@ -0,0 +1,138 @@ +#include "blobstorage_syncer_broker.h" + +#include <ydb/core/blobstorage/vdisk/common/vdisk_log.h> + +namespace NKikimr { + + class TSyncBroker : public TActor<TSyncBroker> { + static constexpr size_t ACTIVE_SYNC_LIMIT = 8; + + std::unordered_map<TActorId, std::unordered_set<TActorId>> Active; + + struct TWaitSync { + TActorId VDiskActorId; + std::unordered_set<TActorId> ActorIds; + }; + std::list<TWaitSync> WaitQueue; // TODO: better search + + public: + static constexpr auto ActorActivityType() { + return NKikimrServices::TActivity::BS_SYNC_BROKER; + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvQuerySyncToken, Handle) + hFunc(TEvReleaseSyncToken, Handle) + ) + + TSyncBroker() + : TActor(&TSyncBroker::StateFunc) + {} + + void Handle(TEvQuerySyncToken::TPtr& ev) { + const auto vDiskActorId = ev->Get()->VDiskActorId; + const auto actorId = ev->Sender; + + if (const auto it = Active.find(vDiskActorId); it != Active.end()) { + it->second.insert(actorId); + Send(actorId, new TEvSyncToken); + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, + "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId << + ", actor id: " << actorId << + ", token sent, active: " << Active.size() << + ", waiting: " << WaitQueue.size()); + return; + } + + if (Active.size() < ACTIVE_SYNC_LIMIT) { + Active[vDiskActorId].insert(actorId); + Send(actorId, new TEvSyncToken); + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, + "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId << + ", actor id: " << actorId << + ", token sent, active: " << Active.size() << + ", waiting: " << WaitQueue.size()); + return; + } + + auto pred = [&vDiskActorId](const auto& item) { + return item.VDiskActorId == vDiskActorId; + }; + + if (const auto it = std::find_if(WaitQueue.begin(), WaitQueue.end(), pred); it != WaitQueue.end()) { + it->ActorIds.insert(actorId); + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, + "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId << + ", actor id: " << actorId << + ", enqueued, active: " << Active.size() << + ", waiting: " << WaitQueue.size()); + return; + } + + TWaitSync sync{vDiskActorId, {actorId}}; + WaitQueue.emplace_back(std::move(sync)); + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, + "TEvQuerySyncToken, VDisk actor id: " << vDiskActorId << + ", actor id: " << actorId << + ", enqueued, active: " << Active.size() << + ", waiting: " << WaitQueue.size()); + } + + void ProcessQueue() { + while (!WaitQueue.empty() && Active.size() < ACTIVE_SYNC_LIMIT) { + const auto& waitSync = WaitQueue.front(); + for (const auto& actorId : waitSync.ActorIds) { + Send(actorId, new TEvSyncToken); + } + Active[waitSync.VDiskActorId] = std::move(waitSync.ActorIds); + WaitQueue.pop_front(); + } + } + + void Handle(TEvReleaseSyncToken::TPtr& ev) { + const auto vDiskActorId = ev->Get()->VDiskActorId; + const auto actorId = ev->Sender; + + if (const auto it = Active.find(vDiskActorId); it != Active.end()) { + it->second.erase(actorId); + if (it->second.empty()) { + Active.erase(vDiskActorId); + ProcessQueue(); + } + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, + "TEvReleaseSyncToken, VDisk actor id: " << vDiskActorId << + ", actor id: " << actorId << + ", token released, active: " << Active.size() << + ", waiting: " << WaitQueue.size()); + return; + } + + auto pred = [&vDiskActorId](const auto& item) { + return item.VDiskActorId == vDiskActorId; + }; + + if (const auto it = std::find_if(WaitQueue.begin(), WaitQueue.end(), pred); it != WaitQueue.end()) { + it->ActorIds.erase(actorId); + if (it->ActorIds.empty()) { + WaitQueue.erase(it); + } + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, + "TEvReleaseSyncToken, VDisk actor id: " << vDiskActorId << + ", actor id: " << actorId << + ", removed from queue, active: " << Active.size() << + ", waiting: " << WaitQueue.size()); + } + } + }; + + IActor *CreateSyncBrokerActor() { + return new TSyncBroker; + } + +} // NKikimr diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h new file mode 100644 index 00000000000..9e1995e280f --- /dev/null +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_broker.h @@ -0,0 +1,30 @@ +#pragma once + +#include "defs.h" +#include <ydb/core/base/blobstorage.h> +#include <ydb/core/blobstorage/base/blobstorage_vdiskid.h> + +namespace NKikimr { + + struct TEvQuerySyncToken : public TEventLocal<TEvQuerySyncToken, TEvBlobStorage::EvQuerySyncToken> { + TActorId VDiskActorId; + + explicit TEvQuerySyncToken(const TActorId& id) + : VDiskActorId(id) + {} + }; + + struct TEvSyncToken : public TEventLocal<TEvSyncToken, TEvBlobStorage::EvSyncToken> + {}; + + struct TEvReleaseSyncToken : public TEventLocal<TEvReleaseSyncToken, TEvBlobStorage::EvReleaseSyncToken> { + TActorId VDiskActorId; + + explicit TEvReleaseSyncToken(const TActorId& id) + : VDiskActorId(id) + {} + }; + + extern IActor *CreateSyncBrokerActor(); + +} // NKikimr diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp index 7a9454ced70..871171b52ff 100644 --- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_recoverlostdata_proxy.cpp @@ -1,4 +1,5 @@ #include "blobstorage_syncer_recoverlostdata_proxy.h" +#include "blobstorage_syncer_broker.h" #include "blobstorage_syncer_committer.h" #include "blobstorage_syncer_data.h" #include "blobstorage_syncquorum.h" @@ -45,6 +46,39 @@ namespace NKikimr { TVDiskID TargetVDiskId; TActorId TargetActorId; + void Bootstrap(const TActorContext &ctx) { + LOG_DEBUG(ctx, BS_SYNCER, + VDISKP(SyncerCtx->VCtx->VDiskLogPrefix, + "TSyncerRLDFullSyncProxyActor(%s): START", + TargetVDiskId.ToString().data())); + ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvQuerySyncToken(SyncerCtx->VCtx->VDiskActorId), + IEventHandle::FlagTrackDelivery); + Become(&TThis::WaitForBrokerStateFunc); + } + + void Handle(TEvSyncToken::TPtr &ev, const TActorContext &ctx) { + Y_UNUSED(ev); + LOG_DEBUG(ctx, BS_SYNCER, + VDISKP(SyncerCtx->VCtx->VDiskLogPrefix, + "TSyncerRLDFullSyncProxyActor(%s): TEvSyncToken received", + TargetVDiskId.ToString().data())); + CreateAndRunTask(ctx); + } + + void Handle(TEvents::TEvUndelivered::TPtr &ev, const TActorContext &ctx) { + // no sync broker service + if (ev->Get()->SourceType == TEvQuerySyncToken::EventType) { + CreateAndRunTask(ctx); + } + } + + STRICT_STFUNC(WaitForBrokerStateFunc, + HFunc(TEvents::TEvPoisonPill, HandlePoison) + HFunc(TEvSyncToken, Handle) + HFunc(TEvents::TEvUndelivered, Handle) + HFunc(TEvVGenerationChange, Handle) + ) + void CreateAndRunTask(const TActorContext &ctx) { // create task auto task = std::make_unique<TSyncerJobTask>(TSyncerJobTask::EFullRecover, TargetVDiskId, TargetActorId, @@ -56,17 +90,8 @@ namespace NKikimr { Become(&TThis::WaitForSyncStateFunc); } - void Bootstrap(const TActorContext &ctx) { - LOG_DEBUG(ctx, BS_SYNCER, - VDISKP(SyncerCtx->VCtx->VDiskLogPrefix, - "TSyncerRLDFullSyncProxyActor(%s): START", - TargetVDiskId.ToString().data())); - - // run job - CreateAndRunTask(ctx); - } - void Handle(TEvSyncerJobDone::TPtr &ev, const TActorContext &ctx) { + ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvReleaseSyncToken(SyncerCtx->VCtx->VDiskActorId)); LOG_DEBUG(ctx, BS_SYNCER, VDISKP(SyncerCtx->VCtx->VDiskLogPrefix, "TSyncerRLDFullSyncProxyActor(%s): TEvSyncerJobDone; Task# %s", @@ -103,8 +128,9 @@ namespace NKikimr { void Handle(TEvSyncerRLDWakeup::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); - // run job again - CreateAndRunTask(ctx); + ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvQuerySyncToken(SyncerCtx->VCtx->VDiskActorId), + IEventHandle::FlagTrackDelivery); + Become(&TThis::WaitForBrokerStateFunc); } STRICT_STFUNC(WaitForTimeoutStateFunc, @@ -147,6 +173,7 @@ namespace NKikimr { // HandlePoison //////////////////////////////////////////////////////////////////////// void HandlePoison(TEvents::TEvPoisonPill::TPtr &ev, const TActorContext &ctx) { + ctx.Send(MakeBlobStorageSyncBrokerID(), new TEvReleaseSyncToken(SyncerCtx->VCtx->VDiskActorId)); LOG_DEBUG(ctx, BS_SYNCER, VDISKP(SyncerCtx->VCtx->VDiskLogPrefix, "TSyncerRLDFullSyncProxyActor(%s): PoisonPill", diff --git a/ydb/core/blobstorage/vdisk/syncer/ya.make b/ydb/core/blobstorage/vdisk/syncer/ya.make index fae14c75832..56746d96e5b 100644 --- a/ydb/core/blobstorage/vdisk/syncer/ya.make +++ b/ydb/core/blobstorage/vdisk/syncer/ya.make @@ -15,6 +15,8 @@ PEERDIR( SRCS( defs.h + blobstorage_syncer_broker.cpp + blobstorage_syncer_broker.h blobstorage_syncer_committer.cpp blobstorage_syncer_committer.h blobstorage_syncer.cpp diff --git a/ydb/core/blobstorage/vdisk/vdisk_services.h b/ydb/core/blobstorage/vdisk/vdisk_services.h index 8f4ad3a98cd..7cb2f6f00ea 100644 --- a/ydb/core/blobstorage/vdisk/vdisk_services.h +++ b/ydb/core/blobstorage/vdisk/vdisk_services.h @@ -7,6 +7,6 @@ namespace NKikimr { // Some services to run on a node for correct functionalily of VDisks extern IActor *CreateReplBrokerActor(ui64 maxMemBytes); + extern IActor *CreateSyncBrokerActor(); } // NKikimr - diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index bf3a85d80e7..15d6849f0ce 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1084,5 +1084,6 @@ message TActivity { REPLICATION_TRANSFER_WRITER = 657; DATA_ERASURE = 658; TENANT_DATA_ERASURE = 659; + BS_SYNC_BROKER = 660; }; }; |