diff options
author | Sergey Belyakov <serg-belyakov@ydb.tech> | 2024-05-22 15:21:38 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-22 15:21:38 +0300 |
commit | a26dfc6e3aa6cb6720ecb4ddb419709cac8bf082 (patch) | |
tree | 4fc921f964bf97fe7e39a5449ce6c59fc6dc572f | |
parent | 61d0cf5c36f7c4358c190ad1bac9b0c56df8b4db (diff) | |
download | ydb-a26dfc6e3aa6cb6720ecb4ddb419709cac8bf082.tar.gz |
Introduce local SyncLog data cutter (#4124)
32 files changed, 518 insertions, 109 deletions
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index 682bf9cd94..0008414141 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -169,8 +169,15 @@ void TNodeWarden::Bootstrap() { DsProxyPerPoolCounters = new TDsProxyPerPoolCounters(AppData()->Counters); if (actorSystem && actorSystem->AppData<TAppData>() && actorSystem->AppData<TAppData>()->Icb) { - actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching"); - actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch"); + const TIntrusivePtr<NKikimr::TControlBoard>& icb = actorSystem->AppData<TAppData>()->Icb; + + icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching"); + icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch"); + icb->RegisterSharedControl(EnableLocalSyncLogDataCutting, "VDiskControls.EnableLocalSyncLogDataCutting"); + icb->RegisterSharedControl(EnableSyncLogChunkCompressionHDD, "VDiskControls.EnableSyncLogChunkCompressionHDD"); + icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD"); + icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD"); + icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD"); } // start replication broker diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index 985680808f..b0dc88d2fe 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -133,6 +133,12 @@ namespace NKikimr::NStorage { TControlWrapper EnablePutBatching; TControlWrapper EnableVPatch; + TControlWrapper EnableLocalSyncLogDataCutting; + TControlWrapper EnableSyncLogChunkCompressionHDD; + TControlWrapper EnableSyncLogChunkCompressionSSD; + TControlWrapper MaxSyncLogChunksInFlightHDD; + TControlWrapper MaxSyncLogChunksInFlightSSD; + TReplQuoter::TPtr ReplNodeRequestQuoter; TReplQuoter::TPtr ReplNodeResponseQuoter; @@ -148,6 +154,11 @@ namespace NKikimr::NStorage { : Cfg(cfg) , EnablePutBatching(Cfg->FeatureFlags.GetEnablePutBatchingForBlobStorage(), false, true) , EnableVPatch(Cfg->FeatureFlags.GetEnableVPatch(), false, true) + , EnableLocalSyncLogDataCutting(0, 0, 1) + , EnableSyncLogChunkCompressionHDD(1, 0, 1) + , EnableSyncLogChunkCompressionSSD(0, 0, 1) + , MaxSyncLogChunksInFlightHDD(10, 1, 1024) + , MaxSyncLogChunksInFlightSSD(10, 1, 1024) { Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1); AvailDomainId = 1; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp index 7a1fa34dbe..47ebb27134 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp @@ -174,6 +174,16 @@ namespace NKikimr::NStorage { vdiskConfig->EnableVDiskCooldownTimeout = Cfg->EnableVDiskCooldownTimeout; vdiskConfig->ReplPausedAtStart = Cfg->VDiskReplPausedAtStart; vdiskConfig->EnableVPatch = EnableVPatch; + + vdiskConfig->EnableLocalSyncLogDataCutting = EnableLocalSyncLogDataCutting; + if (deviceType == NPDisk::EDeviceType::DEVICE_TYPE_ROT) { + vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionHDD; + vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightHDD; + } else { + vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionSSD; + vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightSSD; + } + vdiskConfig->FeatureFlags = Cfg->FeatureFlags; if (Cfg->BlobStorageConfig.HasCostMetricsSettings()) { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h index f1b5d2b0c0..0012985963 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h @@ -72,7 +72,7 @@ public: appData->IoContextFactory = IoContext.get(); Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend()); - Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}}); + Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}}); Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE); Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE); Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG); diff --git a/ydb/core/blobstorage/ut_blobstorage/sync.cpp b/ydb/core/blobstorage/ut_blobstorage/sync.cpp index d092eac9b9..41a32501a7 100644 --- a/ydb/core/blobstorage/ut_blobstorage/sync.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/sync.cpp @@ -1,7 +1,108 @@ +#include <ydb/core/blobstorage/ut_blobstorage/ut_helpers.h> #include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h> +#include <util/random/random.h> Y_UNIT_TEST_SUITE(BlobStorageSync) { + + void TestCutting(TBlobStorageGroupType groupType) { + const ui32 groupSize = groupType.BlobSubgroupSize(); + + // for (ui32 mask = 0; mask < (1 << groupSize); ++mask) { // TIMEOUT + { + ui32 mask = RandomNumber(1ull << groupSize); + for (bool compressChunks : { true, false }) { + TEnvironmentSetup env{{ + .NodeCount = groupSize, + .Erasure = groupType, + }}; + + env.CreateBoxAndPool(1, 1); + std::vector<ui32> groups = env.GetGroups(); + UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1); + ui32 groupId = groups[0]; + + const ui64 tabletId = 5000; + const ui32 channel = 10; + ui32 gen = 1; + ui32 step = 1; + ui64 cookie = 1; + + ui64 totalSize = 0; + + std::vector<TControlWrapper> cutLocalSyncLogControls; + std::vector<TControlWrapper> compressChunksControls; + std::vector<TActorId> edges; + + for (ui32 nodeId = 1; nodeId <= groupSize; ++nodeId) { + cutLocalSyncLogControls.emplace_back(0, 0, 1); + compressChunksControls.emplace_back(1, 0, 1); + TAppData* appData = env.Runtime->GetNode(nodeId)->AppData.get(); + appData->Icb->RegisterSharedControl(cutLocalSyncLogControls.back(), "VDiskControls.EnableLocalSyncLogDataCutting"); + appData->Icb->RegisterSharedControl(compressChunksControls.back(), "VDiskControls.EnableSyncLogChunkCompressionHDD"); + edges.push_back(env.Runtime->AllocateEdgeActor(nodeId)); + } + + for (ui32 i = 0; i < groupSize; ++i) { + env.Runtime->WrapInActorContext(edges[i], [&] { + SendToBSProxy(edges[i], groupId, new TEvBlobStorage::TEvStatus(TInstant::Max())); + }); + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(edges[i], false); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } + + auto writeBlob = [&](ui32 nodeId, ui32 blobSize) { + TLogoBlobID blobId(tabletId, gen, step, channel, blobSize, ++cookie); + totalSize += blobSize; + TString data = MakeData(blobSize); + + const TActorId& sender = edges[nodeId - 1]; + env.Runtime->WrapInActorContext(sender, [&] () { + SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvPut(blobId, std::move(data), TInstant::Max())); + }); + }; + + env.Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr<IEventHandle>& ev) { + switch(ev->Type) { + case TEvBlobStorage::TEvPutResult::EventType: + UNIT_ASSERT_VALUES_EQUAL(ev->Get<TEvBlobStorage::TEvPutResult>()->Status, NKikimrProto::OK); + return false; + default: + return true; + } + }; + + while (totalSize < 16_MB) { + writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB)); + } + env.Sim(TDuration::Minutes(5)); + + for (ui32 i = 0; i < groupSize; ++i) { + cutLocalSyncLogControls[i] = !!(mask & (1 << i)); + compressChunksControls[i] = compressChunks; + } + + while (totalSize < 32_MB) { + writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB)); + } + + env.Sim(TDuration::Minutes(5)); + } + } + } + + Y_UNIT_TEST(TestSyncLogCuttingMirror3dc) { + TestCutting(TBlobStorageGroupType::ErasureMirror3dc); + } + + Y_UNIT_TEST(TestSyncLogCuttingMirror3of4) { + TestCutting(TBlobStorageGroupType::ErasureMirror3of4); + } + + Y_UNIT_TEST(TestSyncLogCuttingBlock4Plus2) { + TestCutting(TBlobStorageGroupType::Erasure4Plus2Block); + } + Y_UNIT_TEST(SyncWhenDiskGetsDown) { return; // re-enable when protocol issue is resolved diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h b/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h index 5fb44fa60a..163aa45126 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h +++ b/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h @@ -7,6 +7,11 @@ namespace NKikimr { TString MakeData(ui32 dataSize); +template<typename Int1 = ui32, typename Int2 = ui32> +inline Int1 GenerateRandom(Int1 min, Int2 max) { + return min + RandomNumber(max - min); +} + class TInflightActor : public TActorBootstrapped<TInflightActor> { public: struct TSettings { diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp index f4a2113a5b..8a0f784f6c 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp @@ -63,6 +63,8 @@ namespace NKikimr { SyncLogAdvisedIndexedBlockSize = ui32(1) << ui32(20); // 1 MB SyncLogMaxMemAmount = ui64(64) << ui64(20); // 64 MB + MaxSyncLogChunkSize = ui32(16) << ui32(10); // 32 Kb + ReplTimeInterval = TDuration::Seconds(60); // 60 seconds ReplRequestTimeout = TDuration::Seconds(10); // 10 seconds ReplPlanQuantum = TDuration::MilliSeconds(100); // 100 ms diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h index a2335182c4..35a9ec4e68 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h @@ -154,6 +154,11 @@ namespace NKikimr { ui32 SyncLogAdvisedIndexedBlockSize; ui64 SyncLogMaxMemAmount; + TControlWrapper EnableLocalSyncLogDataCutting; + TControlWrapper EnableSyncLogChunkCompression; + TControlWrapper MaxSyncLogChunksInFlight; + ui32 MaxSyncLogChunkSize; + ///////////// REPL SETTINGS ///////////////////////// TDuration ReplTimeInterval; TDuration ReplRequestTimeout; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp index 393c966eea..fedaca0421 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp @@ -513,14 +513,11 @@ namespace NKikimr { ///////////////// SYNC ////////////////////////////////////////////////////// TLsnSeg THull::AllocateLsnForSyncDataCmd(const TString &data) { - // count number of elements - ui32 counter = 0; - auto count = [&counter] (const void *) { counter++; }; - // do job - count all elements - NSyncLog::TFragmentReader(data).ForEach(count, count, count, count); + NSyncLog::TFragmentReader fragment(data); // allocate LsnSeg; we reserve a diapason of lsns since we put multiple records - ui64 lsnAdvance = counter; + std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords(); + ui64 lsnAdvance = records.size(); Y_ABORT_UNLESS(lsnAdvance > 0); auto seg = Fields->LsnMngr->AllocLsnForHull(lsnAdvance); @@ -536,7 +533,9 @@ namespace NKikimr { curLsn++; }; // do job - update blocks cache - NSyncLog::TFragmentReader(data).ForEach(otherHandler, blockHandler, otherHandler, blockHandlerV2); + for (const NSyncLog::TRecordHdr* rec : records) { + NSyncLog::HandleRecordHdr(rec, otherHandler, blockHandler, otherHandler, blockHandlerV2); + } // check that all records are applied Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1); diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp index ee7022da11..fb7f0875a3 100644 --- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp +++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp @@ -429,10 +429,9 @@ namespace NKikimr { void ApplySyncDataByRecord(const TActorContext &ctx, ui64 recordLsn) { // count number of records - ui64 recsNum = 0; - auto count = [&recsNum] (const void *) { recsNum++; }; NSyncLog::TFragmentReader fragment(LocalSyncDataMsg.Data); - fragment.ForEach(count, count, count, count); + std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords(); + ui64 recsNum = records.size(); // calculate lsn Y_DEBUG_ABORT_UNLESS(recordLsn >= recsNum, "recordLsn# %" PRIu64 " recsNum# %" PRIu64, @@ -465,7 +464,9 @@ namespace NKikimr { }; // apply local sync data - fragment.ForEach(blobHandler, blockHandler, barrierHandler, blockHandlerV2); + for (const NSyncLog::TRecordHdr* rec : records) { + NSyncLog::HandleRecordHdr(rec, blobHandler, blockHandler, barrierHandler, blockHandlerV2); + } } void PutLogoBlobsBatchToHull( diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp index 2173cf4c64..ba013028ed 100644 --- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp @@ -1,5 +1,6 @@ #include "blobstorage_syncer_localwriter.h" #include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h> +#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h> namespace NKikimr { @@ -186,5 +187,118 @@ namespace NKikimr { return new TLocalSyncDataExtractorActor(vctx, skeletonId, parentId, std::move(ev)); } + /////////////////////////////////////////////////////////////////////////////////////////////// + // TLocalSyncDataCutterActor -- actor extracts data from TEvLocalSyncData, cuts it into + // smaller chunks and sends in multiple messages to Skeleton + /////////////////////////////////////////////////////////////////////////////////////////////// + class TLocalSyncDataCutterActor : public TActorBootstrapped<TLocalSyncDataCutterActor> { + TIntrusivePtr<TVDiskConfig> VConfig; + TIntrusivePtr<TVDiskContext> VCtx; + TActorId SkeletonId; + TActorId ParentId; + std::unique_ptr<TEvLocalSyncData> Ev; + std::vector<TString> Chunks; + + ui32 ChunksInFlight = 0; + bool CompressChunks; + ui32 MaxChunksInFlight; + ui32 MaxChunksSize; + + public: + void Bootstrap(const TActorContext&) { + THPTimer timer; + std::unique_ptr<NSyncLog::TNaiveFragmentWriter> fragmentWriter; + + if (CompressChunks) { + fragmentWriter.reset(new NSyncLog::TLz4FragmentWriter); + } else { + fragmentWriter.reset(new NSyncLog::TNaiveFragmentWriter); + } + + auto addChunk = [&]() { + if (fragmentWriter->GetSize()) { + TString chunk; + fragmentWriter->Finish(&chunk); + Chunks.emplace_back(std::move(chunk)); + fragmentWriter->Clear(); + } + }; + + NSyncLog::TFragmentReader fragmentReader(Ev->Data); + std::vector<const NSyncLog::TRecordHdr*> records = fragmentReader.ListRecords(); + for (const NSyncLog::TRecordHdr* rec : records) { + if (fragmentWriter->GetSize() + rec->GetSize() > MaxChunksSize) { + addChunk(); + } + fragmentWriter->Push(rec, rec->GetSize()); + } + addChunk(); + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, VCtx->VDiskLogPrefix + << "TLocalSyncDataCutterActor: VDiskId# " << Ev->VDiskID.ToString() + << " dataSize# " << Ev->Data.size() + << " duration# " << TDuration::Seconds(timer.Passed())); + + Become(&TThis::StateFunc); + SendChunks(); + } + + void Finish(const NKikimrProto::EReplyStatus& status) { + Send(ParentId, new TEvLocalSyncDataResult(status, TAppData::TimeProvider->Now(), nullptr, nullptr)); + PassAway(); + } + + void Handle(const TEvLocalSyncDataResult::TPtr& ev) { + if (ev->Get()->Status == NKikimrProto::OK) { + --ChunksInFlight; + if (Chunks.empty() && ChunksInFlight == 0) { + Finish(NKikimrProto::OK); + } else { + SendChunks(); + } + } else { + Finish(ev->Get()->Status); + } + } + + void SendChunks() { + while (ChunksInFlight < MaxChunksInFlight && !Chunks.empty()) { + Send(SkeletonId, new TEvLocalSyncData(Ev->VDiskID, Ev->SyncState, std::move(Chunks.back()))); + Chunks.pop_back(); + ++ChunksInFlight; + } + } + + public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::VDISK_LOCALSYNCDATA_CUTTER; + } + + TLocalSyncDataCutterActor( + const TIntrusivePtr<TVDiskConfig>& vconfig, + const TIntrusivePtr<TVDiskContext>& vctx, + const TActorId& skeletonId, + const TActorId& parentId, + std::unique_ptr<TEvLocalSyncData> ev) + : VCtx(vctx) + , SkeletonId(skeletonId) + , ParentId(parentId) + , Ev(std::move(ev)) + , CompressChunks(vconfig->MaxSyncLogChunksInFlight) + , MaxChunksInFlight(vconfig->MaxSyncLogChunksInFlight) + , MaxChunksSize(vconfig->MaxSyncLogChunkSize) + {} + + STRICT_STFUNC(StateFunc, { + hFunc(TEvLocalSyncDataResult, Handle); + }) + + }; + + IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx, + const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev) { + return new TLocalSyncDataCutterActor(vconfig, vctx, skeletonId, parentId, std::move(ev)); + } + } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h index 1bb55b311f..779acfd31a 100644 --- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h @@ -69,4 +69,10 @@ namespace NKikimr { IActor *CreateLocalSyncDataExtractor(const TIntrusivePtr<TVDiskContext> &vctx, const TActorId &skeletonId, const TActorId &parentId, std::unique_ptr<TEvLocalSyncData> ev); + /////////////////////////////////////////////////////////////////////////////////////////////// + // CreateLocalSyncDataCutter + /////////////////////////////////////////////////////////////////////////////////////////////// + IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx, + const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev); + } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp b/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp index 29ec0237ed..7378a171fa 100644 --- a/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp +++ b/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp @@ -193,14 +193,21 @@ namespace NKikimr { auto syncState = GetCurrent().SyncState; auto msg = std::make_unique<TEvLocalSyncData>(vdisk, syncState, data); + if (Ctx->SyncerCtx->Config->EnableLocalSyncLogDataCutting) { + std::unique_ptr<IActor> actor(CreateLocalSyncDataCutter(Ctx->SyncerCtx->Config, + Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, parentId, std::move(msg))); + return TSjOutcome::Actor(std::move(actor), true); + } else { + #ifdef UNPACK_LOCALSYNCDATA - std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, - parentId, std::move(msg))); - return TSjOutcome::Actor(actor.Release(), true); + std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, + parentId, std::move(msg))); + return TSjOutcome::Actor(actor.Release(), true); #else - Y_UNUSED(parentId); - return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg)); + Y_UNUSED(parentId); + return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg)); #endif + } } else { if (!EndOfStream) { Ctx->SyncerCtx->VCtx->Logger(NActors::NLog::PRI_ERROR, BS_SYNCER, @@ -331,14 +338,22 @@ namespace NKikimr { // SyncState position. Phase = EWaitLocal; auto msg = std::make_unique<TEvLocalSyncData>(vdisk, OldSyncState, data); + + if (Ctx->SyncerCtx->Config->EnableLocalSyncLogDataCutting) { + std::unique_ptr<IActor> actor(CreateLocalSyncDataCutter(Ctx->SyncerCtx->Config, + Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, parentId, std::move(msg))); + return TSjOutcome::Actor(std::move(actor), true); + } else { + auto msg = std::make_unique<TEvLocalSyncData>(vdisk, OldSyncState, data); #ifdef UNPACK_LOCALSYNCDATA - std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, - parentId, std::move(msg))); - return TSjOutcome::Actor(actor.Release(), true); + std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, + parentId, std::move(msg))); + return TSjOutcome::Actor(actor.Release(), true); #else - Y_UNUSED(parentId); - return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg)); + Y_UNUSED(parentId); + return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg)); #endif + } } else { if (!EndOfStream) { Ctx->SyncerCtx->VCtx->Logger(NActors::NLog::PRI_ERROR, BS_SYNCER, diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp index e356f0a205..d6a86cff3d 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp @@ -11,29 +11,13 @@ namespace NKikimr { ForEach(Data, fblob, fblock, fbar, fblock2); } - void TNaiveFragmentReader::ForEach(const TString &d, TReadLogoBlobRec fblob, TReadBlockRec fblock, - TReadBarrierRec fbar, TReadBlockRecV2 fblock2) { - const TRecordHdr *begin = (const TRecordHdr *)(d.data()); - const TRecordHdr *end = (const TRecordHdr *)(d.data() + d.size()); - - for (const TRecordHdr *it = begin; it < end; it = it->Next()) { - switch (it->RecType) { - case TRecordHdr::RecLogoBlob: - fblob(it->GetLogoBlob()); - break; - case TRecordHdr::RecBlock: - fblock(it->GetBlock()); - break; - case TRecordHdr::RecBarrier: - fbar(it->GetBarrier()); - break; - case TRecordHdr::RecBlockV2: - fblock2(it->GetBlockV2()); - break; - default: - Y_ABORT("Unknown RecType: %s", it->ToString().data()); - } - } + std::vector<const TRecordHdr*> TNaiveFragmentReader::ListRecords() { + std::vector<const TRecordHdr*> records = {}; + TWriteRecordToList writeToList{records}; + + ForEach(Data, writeToList, writeToList, writeToList, writeToList); + + return std::move(records); } bool TNaiveFragmentReader::Check(TString &errorString) { @@ -62,55 +46,30 @@ namespace NKikimr { } } + std::vector<const TRecordHdr*> TLz4FragmentReader::ListRecords() { + Decompress(); + std::vector<const TRecordHdr*> records = {}; + TWriteRecordToList writeToList{records}; + + TNaiveFragmentReader::ForEach(Uncompressed, writeToList, writeToList, writeToList, writeToList); + + return std::move(records); + } + //////////////////////////////////////////////////////////////////////////// // TBaseOrderedFragmentReader //////////////////////////////////////////////////////////////////////////// - void TBaseOrderedFragmentReader::ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) { - Decompress(); - using THeapItem = std::tuple<TRecordHdr::ESyncLogRecType, const void*, ui32>; - auto comp = [](const THeapItem& x, const THeapItem& y) { return std::get<2>(y) < std::get<2>(x); }; + void TBaseOrderedFragmentReader::ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) { + return ForEachImpl(fblob, fblock, fbar, fblock2); + } - TStackVec<THeapItem, 4> heap; + std::vector<const TRecordHdr*> TBaseOrderedFragmentReader::ListRecords() { + std::vector<const TRecordHdr*> records; + TWriteRecordToList writeToList{records}; -#define ADD_HEAP(NAME, TYPE) \ - if (!Records.NAME.empty()) { \ - heap.emplace_back(TRecordHdr::TYPE, &Records.NAME.front(), Records.NAME.front().Counter); \ - } - ADD_HEAP(LogoBlobs, RecLogoBlob) - ADD_HEAP(Blocks, RecBlock) - ADD_HEAP(Barriers, RecBarrier) - ADD_HEAP(BlocksV2, RecBlockV2) - - std::make_heap(heap.begin(), heap.end(), comp); - - while (!heap.empty()) { - std::pop_heap(heap.begin(), heap.end(), comp); - auto& item = heap.back(); // say thanks to Microsoft compiler for not supporting tuple binding correctly - auto& type = std::get<0>(item); - auto& ptr = std::get<1>(item); - auto& counter = std::get<2>(item); - switch (type) { -#define PROCESS(NAME, TYPE, FUNC) \ - case TRecordHdr::TYPE: { \ - using T = std::decay_t<decltype(Records.NAME)>::value_type; \ - const T *item = static_cast<const T*>(ptr); \ - FUNC(item); \ - if (++item != Records.NAME.data() + Records.NAME.size()) { \ - ptr = item; \ - counter = item->Counter; \ - std::push_heap(heap.begin(), heap.end(), comp); \ - } else { \ - heap.pop_back(); \ - } \ - break; \ - } - PROCESS(LogoBlobs, RecLogoBlob, fblob) - PROCESS(Blocks, RecBlock, fblock) - PROCESS(Barriers, RecBarrier, fbar) - PROCESS(BlocksV2, RecBlockV2, fblock2) - } - } + ForEachImpl(writeToList, writeToList, writeToList, writeToList); + return std::move(records); } //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h index d3c7780e92..c89a656fab 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h @@ -15,6 +15,35 @@ namespace NKikimr { using TReadBarrierRec = std::function<void(const TBarrierRec *)>; using TReadBlockRecV2 = std::function<void(const TBlockRecV2 *)>; + struct TWriteRecordToList { + void operator()(const void* ptr) { + Records.push_back((const TRecordHdr*)((char*)ptr - sizeof(TRecordHdr))); + }; + + std::vector<const TRecordHdr*>& Records; + }; + + + template <class TLogoBlobCallback, class TBlockCallback, class TReadBarrierCallback, class TBlockV2Callback> + void HandleRecordHdr(const TRecordHdr* rec, TLogoBlobCallback fblob, TBlockCallback fblock, TReadBarrierCallback fbar, + TBlockV2Callback fblock2) { + switch (rec->RecType) { + case TRecordHdr::RecLogoBlob: + fblob(rec->GetLogoBlob()); + break; + case TRecordHdr::RecBlock: + fblock(rec->GetBlock()); + break; + case TRecordHdr::RecBarrier: + fbar(rec->GetBarrier()); + break; + case TRecordHdr::RecBlockV2: + fblock2(rec->GetBlockV2()); + break; + default: + Y_ABORT("Unknown RecType: %s", rec->ToString().data()); + } + } //////////////////////////////////////////////////////////////////////////// // IFragmentReader @@ -23,6 +52,7 @@ namespace NKikimr { public: virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) = 0; virtual bool Check(TString &errorString) = 0; + virtual std::vector<const TRecordHdr*> ListRecords() = 0; virtual ~IFragmentReader() {} }; @@ -38,10 +68,21 @@ namespace NKikimr { virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) override; virtual bool Check(TString &errorString) override; + virtual std::vector<const TRecordHdr*> ListRecords() override; protected: const TString &Data; - void ForEach(const TString &d, TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2); + + template <class TLogoBlobCallback, class TBlockCallback, class TReadBarrierCallback, class TBlockV2Callback> + void ForEach(const TString &d, TLogoBlobCallback fblob, TBlockCallback fblock, TReadBarrierCallback fbar, + TBlockV2Callback fblock2) { + const TRecordHdr* begin = (const TRecordHdr*)(d.data()); + const TRecordHdr* end = (const TRecordHdr*)(d.data() + d.size()); + + for (const TRecordHdr* it = begin; it < end; it = it->Next()) { + HandleRecordHdr(it, fblob, fblock, fbar, fblock2); + } + } }; @@ -57,6 +98,7 @@ namespace NKikimr { virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) override; virtual bool Check(TString &errorString) override; + virtual std::vector<const TRecordHdr*> ListRecords() override; private: mutable TString Uncompressed; @@ -69,9 +111,60 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// class TBaseOrderedFragmentReader : public IFragmentReader { public: + virtual std::vector<const TRecordHdr*> ListRecords() override; virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) override; protected: + template <class TLogoBlobCallback, class TBlockCallback, class TReadBarrierCallback, class TBlockV2Callback> + void ForEachImpl(TLogoBlobCallback fblob, TBlockCallback fblock, TReadBarrierCallback fbar, + TBlockV2Callback + fblock2) { + Decompress(); + + using THeapItem = std::tuple<TRecordHdr::ESyncLogRecType, const void*, ui32>; + auto comp = [](const THeapItem& x, const THeapItem& y) { return std::get<2>(y) < std::get<2>(x); }; + + TStackVec<THeapItem, 4> heap; + + #define ADD_HEAP(NAME, TYPE) \ + if (!Records.NAME.empty()) { \ + heap.emplace_back(TRecordHdr::TYPE, &Records.NAME.front(), Records.NAME.front().Counter); \ + } + ADD_HEAP(LogoBlobs, RecLogoBlob) + ADD_HEAP(Blocks, RecBlock) + ADD_HEAP(Barriers, RecBarrier) + ADD_HEAP(BlocksV2, RecBlockV2) + + std::make_heap(heap.begin(), heap.end(), comp); + + while (!heap.empty()) { + std::pop_heap(heap.begin(), heap.end(), comp); + auto& item = heap.back(); // say thanks to Microsoft compiler for not supporting tuple binding correctly + auto& type = std::get<0>(item); + auto& ptr = std::get<1>(item); + auto& counter = std::get<2>(item); + switch (type) { + #define PROCESS(NAME, TYPE, FUNC) \ + case TRecordHdr::TYPE: { \ + using T = std::decay_t<decltype(Records.NAME)>::value_type; \ + const T *item = static_cast<const T*>(ptr); \ + FUNC(item); \ + if (++item != Records.NAME.data() + Records.NAME.size()) { \ + ptr = item; \ + counter = item->Counter; \ + std::push_heap(heap.begin(), heap.end(), comp); \ + } else { \ + heap.pop_back(); \ + } \ + break; \ + } + PROCESS(LogoBlobs, RecLogoBlob, fblob) + PROCESS(Blocks, RecBlock, fblock) + PROCESS(Barriers, RecBarrier, fbar) + PROCESS(BlocksV2, RecBlockV2, fblock2) + } + } + } TRecordsWithSerial Records; virtual bool Decompress() = 0; @@ -123,6 +216,10 @@ namespace NKikimr { public: TFragmentReader(const TString &data); + std::vector<const TRecordHdr*> ListRecords() { + return Impl->ListRecords(); + } + void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) { Impl->ForEach(fblob, fblock, fbar, fblock2); } diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp index ab2b6dc161..b3611db53c 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp @@ -11,6 +11,11 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// // TNaiveFragmentWriter //////////////////////////////////////////////////////////////////////////// + void TNaiveFragmentWriter::Clear() { + Chain = {{64 << 10}}; // 64 KiB initial storage + DataSize = 0; + } + void TNaiveFragmentWriter::Finish(TString *respData) { respData->clear(); respData->reserve(DataSize); diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h index a31582e83d..998c60e060 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h @@ -18,10 +18,11 @@ namespace NKikimr { size_t DataSize; public: - TNaiveFragmentWriter() - : Chain({{64 << 10}}) // 64 KiB initial storage - , DataSize(0) - {} + TNaiveFragmentWriter() { + Clear(); + } + + virtual ~TNaiveFragmentWriter() = default; size_t GetSize() const { return DataSize; @@ -48,7 +49,9 @@ namespace NKikimr { } } - void Finish(TString *respData); + void Clear(); + + virtual void Finish(TString *respData); }; //////////////////////////////////////////////////////////////////////////// @@ -60,7 +63,7 @@ namespace NKikimr { : TNaiveFragmentWriter() {} - void Finish(TString *respData); + virtual void Finish(TString *respData) override; }; //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 3f6337385e..17049b6f0f 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1228,12 +1228,41 @@ message TImmediateControlsConfig { DefaultValue: 8388608 }]; } + message TVDiskControls { + optional uint64 EnableLocalSyncLogDataCutting = 1 [(ControlOptions) = { + Description: "Allow cutting large TEvLocalSyncLogData messages into smaller chunks", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 EnableSyncLogChunkCompressionHDD = 2 [(ControlOptions) = { + Description: "Compress SyncLog chunks before writing to log, setting for HDD", + MinValue: 0, + MaxValue: 1, + DefaultValue: 1 }]; + optional uint64 EnableSyncLogChunkCompressionSSD = 3 [(ControlOptions) = { + Description: "Compress SyncLog chunks before writing to log, setting for SSD", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 MaxSyncLogChunksInFlightHDD = 4 [(ControlOptions) = { + Description: "Maximum number of SyncLog chunks written simultaneously to log, setting for HDD", + MinValue: 1, + MaxValue: 1024, + DefaultValue: 10 }]; + optional uint64 MaxSyncLogChunksInFlightSSD = 5 [(ControlOptions) = { + Description: "Maximum number of SyncLog chunks written simultaneously to log, setting for SSD", + MinValue: 1, + MaxValue: 1024, + DefaultValue: 10 }]; + } + optional TDataShardControls DataShardControls = 1; optional TTxLimitControls TxLimitControls = 2; optional TCoordinatorControls CoordinatorControls = 3; optional TSchemeShardControls SchemeShardControls = 4; optional TTCMallocControls TCMallocControls = 5; reserved 6; + optional TVDiskControls VDiskControls = 7; }; message TMeteringConfig { diff --git a/ydb/core/quoter/ut_helpers.cpp b/ydb/core/quoter/ut_helpers.cpp index ca36d439c9..61b582b76e 100644 --- a/ydb/core/quoter/ut_helpers.cpp +++ b/ydb/core/quoter/ut_helpers.cpp @@ -180,7 +180,7 @@ TKesusProxyTestSetup::TKesusProxyTestSetup() { } TTestActorRuntime::TEgg MakeEgg() { - return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr }; + return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} }; } void TKesusProxyTestSetup::Start() { diff --git a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp index 83189b339d..ec7cd7e473 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp +++ b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp @@ -13,7 +13,7 @@ Y_UNIT_TEST_SUITE(PartitionStats) { TTestActorRuntime::TEgg MakeEgg() { - return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr }; + return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} }; } void WaitForBootstrap(TTestActorRuntime &runtime) { diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h index 75f69ca4e5..b2b53302f1 100644 --- a/ydb/core/tablet_flat/test/libs/exec/runner.h +++ b/ydb/core/tablet_flat/test/libs/exec/runner.h @@ -46,7 +46,7 @@ namespace NFake { auto *types = NTable::NTest::DbgRegistry(); auto *app = new TAppData(0, 0, 0, 0, { }, types, nullptr, nullptr, nullptr); - Env.Initialize({ app, nullptr, nullptr }); + Env.Initialize({ app, nullptr, nullptr, {} }); Env.SetDispatchTimeout(DEFAULT_DISPATCH_TIMEOUT); Env.SetLogPriority(NKikimrServices::FAKE_ENV, NActors::NLog::PRI_INFO); diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 0bc5dbb9e9..51341a7625 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -154,6 +154,10 @@ namespace NActors { nodeAppData->GraphConfig = app0->GraphConfig; nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot; nodeAppData->IoContextFactory = app0->IoContextFactory; + if (nodeIndex < egg.Icb.size()) { + nodeAppData->Icb = std::move(egg.Icb[nodeIndex]); + nodeAppData->InFlightLimiterRegistry.Reset(new NKikimr::NGRpcService::TInFlightLimiterRegistry(nodeAppData->Icb)); + } if (KeyConfigGenerator) { nodeAppData->KeyConfig = KeyConfigGenerator(nodeIndex); } else { diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 4ca29fe1e1..3b627b1e15 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -3,6 +3,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/mon/mon.h> #include <ydb/core/base/memobserver.h> +#include <ydb/core/control/immediate_control_board_impl.h> #include <ydb/core/protos/shared_cache.pb.h> #include <ydb/library/actors/testlib/test_runtime.h> @@ -47,6 +48,7 @@ namespace NActors { TAutoPtr<NKikimr::TAppData> App0; TAutoPtr<NActors::IDestructable> Opaque; TKeyConfigGenerator KeyConfigGenerator; + std::vector<TIntrusivePtr<NKikimr::TControlBoard>> Icb; }; TTestActorRuntime(THeSingleSystemEnv d); diff --git a/ydb/core/testlib/actors/test_runtime_ut.cpp b/ydb/core/testlib/actors/test_runtime_ut.cpp index b2c7ec58cf..d649df72fc 100644 --- a/ydb/core/testlib/actors/test_runtime_ut.cpp +++ b/ydb/core/testlib/actors/test_runtime_ut.cpp @@ -12,7 +12,7 @@ Y_UNIT_TEST_SUITE(TActorTest) { TTestActorRuntime::TEgg MakeEgg() { return - { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr }; + { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} }; } Y_UNIT_TEST(TestHandleEvent) { diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp index b27ca91cac..b4c8ff3868 100644 --- a/ydb/core/testlib/basics/appdata.cpp +++ b/ydb/core/testlib/basics/appdata.cpp @@ -71,7 +71,7 @@ namespace NKikimr { NKikimrProto::TKeyConfig(); }; - return { app, Mine.Release(), keyGenerator}; + return { app, Mine.Release(), keyGenerator, std::move(Icb) }; } void TAppPrepare::AddDomain(TDomainsInfo::TDomain* domain) @@ -200,4 +200,11 @@ namespace NKikimr { { AwsCompatibilityConfig.SetAwsRegion(value); } + + void TAppPrepare::InitIcb(ui32 numNodes) + { + for (ui32 i = 0; i < numNodes; ++i) { + Icb.emplace_back(new TControlBoard); + } + } } diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h index c4a43bb444..bf2aaf125b 100644 --- a/ydb/core/testlib/basics/appdata.h +++ b/ydb/core/testlib/basics/appdata.h @@ -87,6 +87,7 @@ namespace NKikimr { void SetEnablePqBilling(std::optional<bool> value); void SetEnableDbCounters(bool value); void SetAwsRegion(const TString& value); + void InitIcb(ui32 numNodes); TIntrusivePtr<TChannelProfiles> Channels; NKikimrBlobStorage::TNodeWardenServiceSet BSConf; @@ -104,6 +105,8 @@ namespace NKikimr { NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig; NKikimrConfig::TS3ProxyResolverConfig S3ProxyResolverConfig; NKikimrConfig::TGraphConfig GraphConfig; + NKikimrConfig::TImmediateControlsConfig ImmediateControlsConfig; + std::vector<TIntrusivePtr<NKikimr::TControlBoard>> Icb; private: TAutoPtr<TMine> Mine; diff --git a/ydb/core/testlib/basics/helpers.h b/ydb/core/testlib/basics/helpers.h index 9545275794..3a77ce61fa 100644 --- a/ydb/core/testlib/basics/helpers.h +++ b/ydb/core/testlib/basics/helpers.h @@ -53,6 +53,8 @@ namespace NFake { void SetupSchemeCache(TTestActorRuntime& runtime, ui32 nodeIndex, const TString& root); void SetupQuoterService(TTestActorRuntime& runtime, ui32 nodeIndex); void SetupSysViewService(TTestActorRuntime& runtime, ui32 nodeIndex); + void SetupIcb(TTestActorRuntime& runtime, ui32 nodeIndex, const NKikimrConfig::TImmediateControlsConfig& config, + const TIntrusivePtr<NKikimr::TControlBoard>& icb); // StateStorage, NodeWarden, TabletResolver, ResourceBroker, SharedPageCache void SetupBasicServices(TTestActorRuntime &runtime, TAppPrepare &app, bool mockDisk = false, diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp index 9d98ea164a..a6ec2f394d 100644 --- a/ydb/core/testlib/basics/services.cpp +++ b/ydb/core/testlib/basics/services.cpp @@ -9,6 +9,8 @@ #include <ydb/core/base/statestorage_impl.h> #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/tablet_resolver.h> +#include <ydb/core/cms/console/immediate_controls_configurator.h> +#include <ydb/core/control/immediate_control_board_actor.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h> #include <ydb/core/quoter/quoter_service.h> @@ -40,6 +42,20 @@ namespace NPDisk { extern const ui64 YdbDefaultPDiskSequence = 0x7e5700007e570000; } + void SetupIcb(TTestActorRuntime& runtime, ui32 nodeIndex, const NKikimrConfig::TImmediateControlsConfig& config, + const TIntrusivePtr<NKikimr::TControlBoard>& icb) + { + runtime.AddLocalService(MakeIcbId(runtime.GetNodeId(nodeIndex)), + TActorSetupCmd(CreateImmediateControlActor(icb, runtime.GetDynamicCounters(nodeIndex)), + TMailboxType::ReadAsFilled, 0), + nodeIndex); + + runtime.AddLocalService(TActorId{}, + TActorSetupCmd(NConsole::CreateImmediateControlsConfigurator(icb, config), + TMailboxType::ReadAsFilled, 0), + nodeIndex); + } + void SetupBSNodeWarden(TTestActorRuntime& runtime, ui32 nodeIndex, TIntrusivePtr<TNodeWardenConfig> nodeWardenConfig) { runtime.AddLocalService(MakeBlobStorageNodeWardenID(runtime.GetNodeId(nodeIndex)), @@ -325,12 +341,17 @@ namespace NPDisk { app.AddHive(0); } + while (app.Icb.size() < runtime.GetNodeCount()) { + app.Icb.emplace_back(new TControlBoard); + } + for (ui32 nodeIndex = 0; nodeIndex < runtime.GetNodeCount(); ++nodeIndex) { SetupStateStorageGroups(runtime, nodeIndex); NKikimrProto::TKeyConfig keyConfig; if (const auto it = app.Keys.find(nodeIndex); it != app.Keys.end()) { keyConfig = it->second; } + SetupIcb(runtime, nodeIndex, app.ImmediateControlsConfig, app.Icb[nodeIndex]); SetupBSNodeWarden(runtime, nodeIndex, disk.MakeWardenConf(*app.Domains, keyConfig)); SetupTabletResolver(runtime, nodeIndex); diff --git a/ydb/core/testlib/tenant_runtime.cpp b/ydb/core/testlib/tenant_runtime.cpp index 64f2af721a..994928e4cf 100644 --- a/ydb/core/testlib/tenant_runtime.cpp +++ b/ydb/core/testlib/tenant_runtime.cpp @@ -820,6 +820,7 @@ void TTenantTestRuntime::Setup(bool createTenantPools) TAppPrepare app; app.FeatureFlags = Extension.GetFeatureFlags(); + app.ImmediateControlsConfig = Extension.GetImmediateControlsConfig(); app.ClearDomainsAndHive(); ui32 planResolution = 500; @@ -854,7 +855,9 @@ void TTenantTestRuntime::Setup(bool createTenantPools) app.AddHive(Config.HiveId); } - for (size_t i = 0; i< Config.Nodes.size(); ++i) { + app.InitIcb(Config.Nodes.size()); + + for (size_t i = 0; i < Config.Nodes.size(); ++i) { AddLocalService(NNodeWhiteboard::MakeNodeWhiteboardServiceId(GetNodeId(i)), TActorSetupCmd(new TFakeNodeWhiteboardService, TMailboxType::Simple, 0), i); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 85c69635ef..4093006a74 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -185,6 +185,8 @@ namespace Tests { app.SetAwsRegion(Settings->AwsRegion); app.CompactionConfig = Settings->CompactionConfig; app.FeatureFlags = Settings->FeatureFlags; + app.ImmediateControlsConfig = Settings->Controls; + app.InitIcb(StaticNodes() + DynamicNodes()); Runtime = MakeHolder<TTestBasicRuntime>(StaticNodes() + DynamicNodes(), Settings->UseRealThreads); @@ -268,14 +270,9 @@ namespace Tests { // for (ui32 nodeIdx = 0; nodeIdx < StaticNodes(); ++nodeIdx) { SetupDomainLocalService(nodeIdx); - SetupConfigurators(nodeIdx); SetupProxies(nodeIdx); } - for (ui32 nodeIdx = StaticNodes(); nodeIdx < StaticNodes() + DynamicNodes(); ++nodeIdx) { - SetupConfigurators(nodeIdx); - } - CreateBootstrapTablets(); SetupStorage(); } diff --git a/ydb/library/ncloud/impl/access_service_ut.cpp b/ydb/library/ncloud/impl/access_service_ut.cpp index 3ac708f071..fd8f15c085 100644 --- a/ydb/library/ncloud/impl/access_service_ut.cpp +++ b/ydb/library/ncloud/impl/access_service_ut.cpp @@ -34,7 +34,7 @@ struct TTestSetup : public NUnitTest::TBaseFixture { TTestActorRuntime::TEgg MakeEgg() { return - { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr }; + { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} }; } void Init() { diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index 6e3dfaa488..2a70e16707 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -1032,5 +1032,6 @@ message TActivity { BACKUP_CONTROLLER_TABLET = 635; REPLICATION_CONTROLLER_SECRET_RESOLVER = 636; REPLICATION_CONTROLLER_DST_ALTERER = 637; + VDISK_LOCALSYNCDATA_CUTTER = 638; }; }; |