diff options
author | Andrew Berlin <aberlin@ydb.tech> | 2024-04-26 18:52:06 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-26 18:52:06 +0300 |
commit | db1b29e584d1ffb699262c4895caac9be615d6fa (patch) | |
tree | 9816285850868b7e22b73577526edb5fb6bdab9b | |
parent | a1f11c9e7c3cf5a1643013f13142a71f112d32e3 (diff) | |
download | ydb-db1b29e584d1ffb699262c4895caac9be615d6fa.tar.gz |
Apply new min huge blob size at runtime (#4016)
28 files changed, 463 insertions, 35 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 2a9dcf6a6ca..143f8d7169d 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -708,6 +708,7 @@ struct TEvBlobStorage { EvReplInvoke, EvStartBalancing, EvReplCheckProgress, + EvMinHugeBlobSizeUpdate, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h index f77ce94bb66..8a531e14a92 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h @@ -2,6 +2,8 @@ #include <ydb/core/base/hive.h> #include <ydb/core/blob_depot/blob_depot.h> +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/cms/console/console.h> #include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h> #include <ydb/core/blobstorage/dsproxy/mock/model.h> #include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h> diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index c71ac3c1e6f..98bf955f42b 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -40,7 +40,9 @@ struct TEnvironmentSetup { const TFeatureFlags FeatureFlags; const NPDisk::EDeviceType DiskType = NPDisk::EDeviceType::DEVICE_TYPE_NVME; const ui32 BurstThresholdNs = 0; + const ui32 MinHugeBlobInBytes = 0; const float DiskTimeAvailableScale = 1; + const bool UseFakeConfigDispatcher = false; }; const TSettings Settings; @@ -68,6 +70,51 @@ struct TEnvironmentSetup { } }; + + class TFakeConfigDispatcher : public TActor<TFakeConfigDispatcher> { + std::unordered_set<TActorId> Subscribers; + TActorId EdgeId; + public: + TFakeConfigDispatcher() + : TActor<TFakeConfigDispatcher>(&TFakeConfigDispatcher::StateWork) + { + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle); + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle) + hFunc(NConsole::TEvConsole::TEvConfigNotificationResponse, Handle) + hFunc(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle) + } + } + + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest::TPtr& ev) { + auto& items = ev->Get()->ConfigItemKinds; + if (items.at(0) != NKikimrConsole::TConfigItem::BlobStorageConfigItem) { + return; + } + Subscribers.emplace(ev->Sender); + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + EdgeId = ev->Sender; + for (auto& id : Subscribers) { + auto update = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); + update->Record.CopyFrom(ev->Get()->Record); + Send(id, update.Release()); + } + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationResponse::TPtr& ev) { + Forward(ev, EdgeId); + } + + void Handle(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr& ev) { + Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release()); + } + }; + TEnvironmentSetup(bool vdiskReplPausedAtStart, TBlobStorageGroupType erasure = TBlobStorageGroupType::ErasureNone) : TEnvironmentSetup(TSettings{ .VDiskReplPausedAtStart = vdiskReplPausedAtStart, @@ -336,11 +383,22 @@ struct TEnvironmentSetup { type->SetDiskTimeAvailableScale(Settings.DiskTimeAvailableScale); } + { + auto* type = config->BlobStorageConfig.MutableVDiskPerformanceSettings()->AddVDiskTypes(); + type->SetPDiskType(PDiskTypeToPDiskType(Settings.DiskType)); + if (Settings.MinHugeBlobInBytes) { + type->SetMinHugeBlobSizeInBytes(Settings.MinHugeBlobInBytes); + } + } + warden.reset(CreateBSNodeWarden(config)); } const TActorId wardenId = Runtime->Register(warden.release(), nodeId); Runtime->RegisterService(MakeBlobStorageNodeWardenID(nodeId), wardenId); + if (Settings.UseFakeConfigDispatcher) { + Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(nodeId), Runtime->Register(new TFakeConfigDispatcher(), nodeId)); + } } } @@ -641,6 +699,16 @@ struct TEnvironmentSetup { }); } + void PutBlob(const ui32 groupId, const TLogoBlobID& blobId, const TString& part) { + TActorId edge = Runtime->AllocateEdgeActor(Settings.ControllerNodeId); + Runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(blobId, part, TInstant::Max(), + NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticMaxThroughput)); + }); + auto res = WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge); + Y_ABORT_UNLESS(res->Get()->Status == NKikimrProto::OK); + } + void CommenceReplication() { for (ui32 groupId : GetGroups()) { auto info = GetGroupInfo(groupId); diff --git a/ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp b/ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp new file mode 100644 index 00000000000..5f2124dce5d --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp @@ -0,0 +1,119 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> +#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h> +#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h> +#include <util/system/info.h> + + +enum class EState { + OK, + FORMAT +}; + +struct TReplTestSettings { + ui32 BlobSize; + ui32 MinHugeBlobSize; + ui32 MinHugeBlobSizeInRepl; +}; + +void DoTestCase(const TReplTestSettings& settings) { + using E = EState; + std::vector<EState> states = {E::OK, E::FORMAT, E::FORMAT, E::OK, E::OK, E::OK, E::OK, E::OK}; + ui32 nodeCount = states.size(); + TEnvironmentSetup env(TEnvironmentSetup::TSettings{ + .NodeCount = nodeCount, + .Erasure = TBlobStorageGroupType::EErasureSpecies::Erasure4Plus2Block, + .ControllerNodeId = 1, + .MinHugeBlobInBytes = settings.MinHugeBlobSize, + .UseFakeConfigDispatcher = true, + }); + + env.CreateBoxAndPool(1, 1); + env.Sim(TDuration::Minutes(1)); + + auto groupId = env.GetGroups()[0]; + TString data = TString::Uninitialized(settings.BlobSize); + memset(data.Detach(), 1, data.size()); + TLogoBlobID id(1, 1, 1, 0, data.size(), 0); + env.PutBlob(groupId, id, data); + env.WaitForSync(env.GetGroupInfo(groupId), id); + + auto checkBlob = [&] { + TActorId edge = env.Runtime->AllocateEdgeActor(env.Settings.ControllerNodeId); + env.Runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead)); + }); + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(edge); + auto *msg = res->Get(); + Y_ABORT_UNLESS(msg->ResponseSz == 1); + return msg->Responses[0].Status; + }; + + Y_ABORT_UNLESS(checkBlob() == NKikimrProto::OK); + env.Cleanup(); + + for (auto& [key, state] : env.PDiskMockStates) { + if (states[key.first - 1] == E::FORMAT) { + state.Reset(); + } + } + + env.Initialize(); + + std::vector<std::pair<ui32, std::unique_ptr<IEventHandle>>> detainedMsgs; + env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == TEvBlobStorage::EvReplStarted) { + detainedMsgs.emplace_back(nodeId, std::move(ev)); + return false; + } + return true; + }; + + env.Sim(TDuration::Minutes(10)); + Y_ABORT_UNLESS(checkBlob() == NKikimrProto::OK); + Y_ABORT_IF(detainedMsgs.empty()); + + env.Runtime->FilterFunction = {}; + + // replication is about to start, updating the minHugeBlobSize in skeleton and resuming replication + for (auto& [nodeId, detainedEv] : detainedMsgs) { + TActorId edge = env.Runtime->AllocateEdgeActor(nodeId); + + auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); + auto perfConfig = NKikimrConfig::TBlobStorageConfig_TVDiskPerformanceConfig(); + perfConfig.SetPDiskType(PDiskTypeToPDiskType(env.Settings.DiskType)); + perfConfig.SetMinHugeBlobSizeInBytes(settings.MinHugeBlobSizeInRepl); + auto* vdiskTypes = request->Record.MutableConfig()->MutableBlobStorageConfig()->MutableVDiskPerformanceSettings()->MutableVDiskTypes(); + vdiskTypes->Add(std::move(perfConfig)); + + env.Runtime->Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(nodeId), edge, request.Release()), nodeId); + Y_ABORT_UNLESS(env.Runtime->WaitForEdgeActorEvent({edge})->CastAsLocal<NConsole::TEvConsole::TEvConfigNotificationResponse>()); + + env.Runtime->Send(detainedEv.release(), nodeId); + } + + // waiting for replication to complete + env.WaitForSync(env.GetGroupInfo(groupId), id); + + UNIT_ASSERT_EQUAL(checkBlob(), NKikimrProto::OK); +} + +Y_UNIT_TEST_SUITE(MinHugeChangeOnReplication) { + + Y_UNIT_TEST(MinHugeDecreased) { + DoTestCase(TReplTestSettings{ + .BlobSize = 200u << 10, + .MinHugeBlobSize = 64u << 10, + .MinHugeBlobSizeInRepl = 512u << 10, + }); + } + + Y_UNIT_TEST(MinHugeIncreased) { + DoTestCase(TReplTestSettings{ + .BlobSize = 200u << 10, + .MinHugeBlobSize = 512u << 10, + .MinHugeBlobSizeInRepl = 10u << 10, + }); + } + +} diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make index ae6c2101bc1..bd564dba019 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make @@ -10,6 +10,7 @@ TAG(ya:fat) SRCS( replication.cpp + replication_huge.cpp ) PEERDIR( diff --git a/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp b/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp index da5ea2e1604..d32b7bfa7ad 100644 --- a/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp +++ b/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp @@ -49,7 +49,7 @@ void ChaoticWriteRestartWrite(const TChaoticWriteRestartWriteSettings &settings, UNIT_ASSERT(success1); Conf.Shutdown(); - Conf.Prepare(settings.WriteRunSetup.get(), false); + Conf.Prepare(settings.SecondWriteRunSetup.get(), false); auto cls2 = std::make_shared<TPutHandleClassGenerator>(settings.Cls); TChaoticManyPutsTest x(settings.Parallel, 1, settings.MsgSize, cls2, settings.WorkingTime, settings.RequestTimeout); diff --git a/ydb/core/blobstorage/ut_vdisk/gen_restarts.h b/ydb/core/blobstorage/ut_vdisk/gen_restarts.h index 4f3e170c0de..70547e403f9 100644 --- a/ydb/core/blobstorage/ut_vdisk/gen_restarts.h +++ b/ydb/core/blobstorage/ut_vdisk/gen_restarts.h @@ -79,21 +79,37 @@ struct TMultiPutWriteRestartReadSettings { }; struct TChaoticWriteRestartWriteSettings : public TWriteRestartReadSettings { - // number of parallel writes/reads (ignored if just single read/write) + // number of parallel writes/reads (ignored if just single read/write)] + const std::shared_ptr<IVDiskSetup> SecondWriteRunSetup; const ui32 Parallel; const TDuration WorkingTime; const TDuration RequestTimeout; TChaoticWriteRestartWriteSettings( const TWriteRestartReadSettings &baseSettings, + std::shared_ptr<IVDiskSetup> secondWriteRunSetup, ui32 parallel, TDuration workingTime, TDuration requestTimeout) : TWriteRestartReadSettings(baseSettings) + , SecondWriteRunSetup(secondWriteRunSetup) , Parallel(parallel) , WorkingTime(workingTime) , RequestTimeout(requestTimeout) {} + + TChaoticWriteRestartWriteSettings( + const TWriteRestartReadSettings &baseSettings, + ui32 parallel, + TDuration workingTime, + TDuration requestTimeout) + : TWriteRestartReadSettings(baseSettings) + , SecondWriteRunSetup(baseSettings.WriteRunSetup) + , Parallel(parallel) + , WorkingTime(workingTime) + , RequestTimeout(requestTimeout) + {} + }; diff --git a/ydb/core/blobstorage/ut_vdisk/lib/setup.h b/ydb/core/blobstorage/ut_vdisk/lib/setup.h index 5b7dd9089e2..cc4a2e2da2f 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/setup.h +++ b/ydb/core/blobstorage/ut_vdisk/lib/setup.h @@ -24,6 +24,15 @@ struct TFastVDiskSetup : public TDefaultVDiskSetup { } }; +struct TFastVDiskSetupMinHugeBlob : public TFastVDiskSetup { + TFastVDiskSetupMinHugeBlob(ui32 minHugeBlobInBytes) { + auto modifier = [=] (NKikimr::TVDiskConfig *cfg) { + cfg->MinHugeBlobInBytes = minHugeBlobInBytes; + }; + AddConfigModifier(modifier); + } +}; + struct TFastVDiskSetupSmallVDiskQueues : public TFastVDiskSetup { TFastVDiskSetupSmallVDiskQueues() { auto modifier = [] (NKikimr::TVDiskConfig *cfg) { diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp index 184cfba946e..518c675b8af 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp @@ -206,6 +206,7 @@ private: nullptr, nullptr, // PDiskCtx nullptr, // HugeBlobCtx + 4097, nullptr, MakeIntrusive<TBlobStorageGroupInfo>(groupInfo), ctx.SelfID, diff --git a/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp b/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp index 8ec7fb3acef..422b139e9fd 100644 --- a/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp +++ b/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp @@ -471,6 +471,20 @@ Y_UNIT_TEST_SUITE(TBsLocalRecovery) { WriteRestartRead(settings, TIMEOUT); } + Y_UNIT_TEST(WriteRestartReadHugeIncreased) { + auto vdiskWriteSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u); + auto vdiskReadSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60u << 10u); + auto settings = TWriteRestartReadSettings(1000, 20u << 10u, HUGEB, vdiskWriteSetup, vdiskReadSetup); + WriteRestartRead(settings, TIMEOUT); + } + + Y_UNIT_TEST(WriteRestartReadHugeDecreased) { + auto vdiskWriteSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60 << 10u); + auto vdiskReadSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u); + auto settings = TWriteRestartReadSettings(1000, 20u << 10u, HUGEB, vdiskWriteSetup, vdiskReadSetup); + WriteRestartRead(settings, TIMEOUT); + } + Y_UNIT_TEST(MultiPutWriteRestartRead) { auto vdiskSetup = std::make_shared<TFastVDiskSetup>(); auto settings = TMultiPutWriteRestartReadSettings::OneSetup(1000, 10, 10, UNK, vdiskSetup); @@ -547,6 +561,30 @@ Y_UNIT_TEST_SUITE(TBsLocalRecovery) { TDuration::Seconds(0)); ChaoticWriteRestartWrite(settings, TIMEOUT); } + + Y_UNIT_TEST(ChaoticWriteRestartHugeIncreased) { + auto vdiskSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u); + auto vdiskSecondSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60u << 10u); + TChaoticWriteRestartWriteSettings settings( + TWriteRestartReadSettings::OneSetup(300, 20u << 10u, HUGEB, vdiskSetup), + vdiskSecondSetup, + 500, + TDuration::Seconds(10), + TDuration::Seconds(0)); + ChaoticWriteRestartWrite(settings, TIMEOUT); + } + + Y_UNIT_TEST(ChaoticWriteRestartHugeDecreased) { + auto vdiskSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60u << 10u); + auto vdiskSecondSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u); + TChaoticWriteRestartWriteSettings settings( + TWriteRestartReadSettings::OneSetup(300, 20u << 10u, HUGEB, vdiskSetup), + vdiskSecondSetup, + 500, + TDuration::Seconds(10), + TDuration::Seconds(0)); + ChaoticWriteRestartWrite(settings, TIMEOUT); + } } /////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/ut_vdisk2/defs.h b/ydb/core/blobstorage/ut_vdisk2/defs.h index 13e976d338c..5359b1892e1 100644 --- a/ydb/core/blobstorage/ut_vdisk2/defs.h +++ b/ydb/core/blobstorage/ut_vdisk2/defs.h @@ -5,5 +5,7 @@ #include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_config.h> #include <ydb/core/blobstorage/vdisk/vdisk_actor.h> +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/cms/console/console.h> #include <ydb/core/util/testactorsys.h> #include <library/cpp/testing/unittest/registar.h> diff --git a/ydb/core/blobstorage/ut_vdisk2/env.h b/ydb/core/blobstorage/ut_vdisk2/env.h index f1019a87408..e15da631531 100644 --- a/ydb/core/blobstorage/ut_vdisk2/env.h +++ b/ydb/core/blobstorage/ut_vdisk2/env.h @@ -21,6 +21,50 @@ namespace NKikimr { TIntrusivePtr<TPDiskMockState> PDiskMockState; std::unordered_map<NKikimrBlobStorage::EVDiskQueueId, TActorId> QueueIds; + class TFakeConfigDispatcher : public TActor<TFakeConfigDispatcher> { + std::unordered_set<TActorId> Subscribers; + TActorId EdgeId; + public: + TFakeConfigDispatcher() + : TActor<TFakeConfigDispatcher>(&TFakeConfigDispatcher::StateWork) + { + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle); + hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle) + hFunc(NConsole::TEvConsole::TEvConfigNotificationResponse, Handle) + hFunc(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle) + } + } + + void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest::TPtr& ev) { + auto& items = ev->Get()->ConfigItemKinds; + if (items.at(0) != NKikimrConsole::TConfigItem::BlobStorageConfigItem) { + return; + } + Subscribers.emplace(ev->Sender); + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { + EdgeId = ev->Sender; + for (auto& id : Subscribers) { + auto update = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); + update->Record.CopyFrom(ev->Get()->Record); + Send(id, update.Release()); + } + } + + void Handle(NConsole::TEvConsole::TEvConfigNotificationResponse::TPtr& ev) { + Forward(ev, EdgeId); + } + + void Handle(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr& ev) { + Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release()); + } + }; + public: TTestEnv(TIntrusivePtr<TPDiskMockState> state = nullptr) : Runtime(std::make_unique<TTestActorSystem>(1)) @@ -31,6 +75,7 @@ namespace NKikimr { SetupLogging(); Runtime->Start(); CreatePDisk(); + CreateConfigDispatcher(); CreateVDisk(); CreateQueues(); } @@ -78,6 +123,23 @@ namespace NKikimr { NKikimrBlobStorage::EVDiskQueueId::PutTabletLog); } + void ChangeMinHugeBlobSize(ui32 minHugeBlobSize) { + const TActorId& edge = Runtime->AllocateEdgeActor(NodeId); + auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); + auto perfConfig = NKikimrConfig::TBlobStorageConfig_TVDiskPerformanceConfig(); + perfConfig.SetPDiskType(PDiskTypeToPDiskType(VDiskConfig->BaseInfo.DeviceType)); + perfConfig.SetMinHugeBlobSizeInBytes(minHugeBlobSize); + + auto* vdiskTypes = request->Record.MutableConfig()->MutableBlobStorageConfig()->MutableVDiskPerformanceSettings()->MutableVDiskTypes(); + vdiskTypes->Add(std::move(perfConfig)); + + Runtime->Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(NodeId), edge, request.Release()), NodeId); + auto ev = Runtime->WaitForEdgeActorEvent({edge}); + Runtime->DestroyActor(edge); + auto *msg = ev->CastAsLocal<NConsole::TEvConsole::TEvConfigNotificationResponse>(); + UNIT_ASSERT(msg); + } + private: template<typename TEvVResult> decltype(std::declval<TEvVResult>().Record) ExecuteQuery(std::unique_ptr<IEventBase> query, @@ -126,6 +188,10 @@ namespace NKikimr { } } + void CreateConfigDispatcher() { + Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(NodeId), Runtime->Register(new TFakeConfigDispatcher(), NodeId)); + } + static NKikimrBlobStorage::EVDiskQueueId GetQueueId(NKikimrBlobStorage::EPutHandleClass prio) { switch (prio) { case NKikimrBlobStorage::EPutHandleClass::TabletLog: diff --git a/ydb/core/blobstorage/ut_vdisk2/huge.cpp b/ydb/core/blobstorage/ut_vdisk2/huge.cpp index 83fd45e3358..153a2a8e366 100644 --- a/ydb/core/blobstorage/ut_vdisk2/huge.cpp +++ b/ydb/core/blobstorage/ut_vdisk2/huge.cpp @@ -13,7 +13,9 @@ Y_UNIT_TEST_SUITE(VDiskTest) { char value = 1; std::vector<TString> blobValues; - for (const ui32 size : {10, 1024, 576 * 1024, 1024 * 1024, 1536 * 1024}) { + std::vector<ui32> minHugeBlobValues = {8 * 1024, 12 * 1024, 60 * 1024, 64 * 1024, 512 * 1024}; + + for (const ui32 size : {10, 1024, 40 * 1024, 576 * 1024, 1024 * 1024, 1536 * 1024}) { for (ui32 i = 0; i < 10; ++i) { TString data = TString::Uninitialized(size); memset(data.Detach(), value++, data.size()); @@ -53,6 +55,7 @@ Y_UNIT_TEST_SUITE(VDiskTest) { ui64 minTotalSize = (ui64)4 << 30; ui64 totalSize = 0; ui8 channel = 0; + ui32 lastMinHugeBlobValue = 0; while (TInstant::Now() < end) { const ui64 tabletId = tabletIds[RandomNumber(tabletIds.size())]; @@ -68,6 +71,16 @@ Y_UNIT_TEST_SUITE(VDiskTest) { content.emplace(id, &data); totalSize += data.size(); + if (RandomNumber(1000u) < 3) { + ui32 minHugeBlobValue; + do { + minHugeBlobValue = minHugeBlobValues[RandomNumber(minHugeBlobValues.size())]; + } while (minHugeBlobValue == lastMinHugeBlobValue); + lastMinHugeBlobValue = minHugeBlobValue; + env->ChangeMinHugeBlobSize(minHugeBlobValue); + Cerr << "Change MinHugeBlobSize# " << minHugeBlobValue << Endl; + } + if (totalSize > maxTotalSize || (totalSize >= minTotalSize && RandomNumber(1000u) < 3)) { std::vector<TLogoBlobID> options; options.reserve(content.size()); diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 1daaca12dea..b714f2199e2 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -3121,4 +3121,14 @@ namespace NKikimr { struct TEvPermitGarbageCollection : TEventLocal<TEvPermitGarbageCollection, TEvBlobStorage::EvPermitGarbageCollection> {}; + //////////////////////////////////////////////////////////////////////////// + // TEvMinHugeBlobSizeUpdate + //////////////////////////////////////////////////////////////////////////// + class TEvMinHugeBlobSizeUpdate : public TEventLocal<TEvMinHugeBlobSizeUpdate, TEvBlobStorage::EvMinHugeBlobSizeUpdate> { + public: + ui32 MinREALHugeBlobInBytes; + + TEvMinHugeBlobSizeUpdate(ui32 minREALHugeBlobInBytes) : MinREALHugeBlobInBytes(minREALHugeBlobInBytes) { + }; + }; } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp index 89bb2d018ea..f8d4b2fec47 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp @@ -16,6 +16,12 @@ namespace NKikimr { return &AllSlotsInfo.at(idx); } + ui32 THugeSlotsMap::AlignByBlockSize(ui32 size) const { + ui32 sizeInBlocks = size / AppendBlockSize; + Y_ABORT_UNLESS(sizeInBlocks, "Blob size to align is smaller than a single block. BlobSize# %" PRIu32, size); + return sizeInBlocks * AppendBlockSize; + } + void THugeSlotsMap::Output(IOutputStream &str) const { str << "{AllSlotsInfo# [\n"; for (const auto &x : AllSlotsInfo) { @@ -42,8 +48,8 @@ namespace NKikimr { } // check whether this blob is huge one; userPartSize doesn't include any metadata stored along with blob - bool THugeBlobCtx::IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const { - return gtype.MaxPartSize(fullId) + (AddHeader ? TDiskBlob::HeaderSize : 0) >= MinREALHugeBlobInBytes; + bool THugeBlobCtx::IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, ui32 minREALHugeBlobInBytes) const { + return gtype.MaxPartSize(fullId) + (AddHeader ? TDiskBlob::HeaderSize : 0) >= minREALHugeBlobInBytes; } } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h index 9045fce089e..1c08c0be927 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h @@ -49,6 +49,7 @@ namespace NKikimr { THugeSlotsMap(ui32 appendBlockSize, TAllSlotsInfo &&slotsInfo, TSearchTable &&searchTable); const TSlotInfo *GetSlotInfo(ui32 size) const; + ui32 AlignByBlockSize(ui32 size) const; void Output(IOutputStream &str) const; TString ToString() const; @@ -64,19 +65,16 @@ namespace NKikimr { class THugeBlobCtx { public: // this value is multiply of AppendBlockSize and is calculated from Config->MinHugeBlobSize - const ui32 MinREALHugeBlobInBytes; const std::shared_ptr<const THugeSlotsMap> HugeSlotsMap; const bool AddHeader; // check whether this NEW blob is huge one; userPartSize doesn't include any metadata stored along with blob - bool IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const; + bool IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, ui32 minREALHugeBlobInBytes) const; - THugeBlobCtx(ui32 minHugeBlobInBytes, ui32 appendBlockSize, const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap, bool addHeader) - : MinREALHugeBlobInBytes(minHugeBlobInBytes / appendBlockSize * appendBlockSize + 1) - , HugeSlotsMap(hugeSlotsMap) + THugeBlobCtx(const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap, bool addHeader) + : HugeSlotsMap(hugeSlotsMap) , AddHeader(addHeader) { - Y_ABORT_UNLESS(MinREALHugeBlobInBytes >= appendBlockSize); } }; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h index d819080ca78..28fa9e6f37d 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h @@ -520,7 +520,11 @@ public: } void MinHugeBlobInBytes(ui32 size) { + if (PrevMinHugeBlobInBytes) { + GroupCounters->GetNamedCounter("MinHugeBlobInBytes", ToString(PrevMinHugeBlobInBytes), false)->Dec(); + } GroupCounters->GetNamedCounter("MinHugeBlobInBytes", ToString(size), false)->Inc(); + PrevMinHugeBlobInBytes = size; } COUNTER_DEF(MovedPatchMsgs); @@ -564,6 +568,8 @@ public: COUNTER_DEF(PutTotalBytes); COUNTER_DEF(GetTotalBytes); + private: + ui32 PrevMinHugeBlobInBytes = 0; }; /////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp index f87c1a105b6..df27d118e98 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp @@ -35,8 +35,6 @@ namespace NKikimr { logFunc); return std::make_shared<THugeBlobCtx>( - ChunkSize, - AppendBlockSize, repairedHuge->Heap->BuildHugeSlotsMap(), true); } diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp index cbdfb3f510a..0aa301c7e3e 100644 --- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp +++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp @@ -448,8 +448,6 @@ namespace NKikimr { lsn, entryPoint, logFunc); } HugeBlobCtx = std::make_shared<THugeBlobCtx>( - Config->MinHugeBlobInBytes, - LocRecCtx->PDiskCtx->Dsk->AppendBlockSize, LocRecCtx->RepairedHuge->Heap->BuildHugeSlotsMap(), Config->AddHeader); HugeKeeperInitialized = true; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp index 208672f6067..b1eeba13a82 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp @@ -14,7 +14,7 @@ std::shared_ptr<TReplCtx> CreateReplCtx(TVector<TVDiskID>& vdisks, const TIntrus auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); auto vctx = MakeIntrusive<TVDiskContext>(TActorId(), info->PickTopology(), counters, TVDiskID(0, 1, 0, 0, 0), nullptr, NPDisk::DEVICE_TYPE_UNKNOWN); - auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(512u << 10u, 4096u, nullptr, true); + auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(nullptr, true); auto dsk = MakeIntrusive<TPDiskParams>(ui8(1), 1u, 128u << 20, 4096u, 0u, 1000000000u, 1000000000u, 65536u, 65536u, 65536u); auto pdiskCtx = std::make_shared<TPDiskCtx>(dsk, TActorId(), TString()); auto replCtx = std::make_shared<TReplCtx>( @@ -22,6 +22,7 @@ std::shared_ptr<TReplCtx> CreateReplCtx(TVector<TVDiskID>& vdisks, const TIntrus nullptr, pdiskCtx, hugeBlobCtx, + 4097, nullptr, info, TActorId(), diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp index e48f236ba3c..bdec548c93e 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp @@ -156,6 +156,7 @@ namespace NKikimr { }; std::shared_ptr<TReplCtx> ReplCtx; + ui32 NextMinREALHugeBlobInBytes; THistory History; EState State; TInstant LastReplStart; @@ -252,6 +253,10 @@ namespace NKikimr { } } + void Handle(TEvMinHugeBlobSizeUpdate::TPtr ev) { + NextMinREALHugeBlobInBytes = ev->Get()->MinREALHugeBlobInBytes; + } + void StartReplication() { STLOG(PRI_DEBUG, BS_REPL, BSVR14, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL START")); STLOG(PRI_DEBUG, BS_REPL, BSVR15, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "QUANTUM START")); @@ -262,6 +267,8 @@ namespace NKikimr { ReplCtx->MonGroup.ReplWorkUnitsDone() = 0; ReplCtx->MonGroup.ReplItemsRemaining() = 0; ReplCtx->MonGroup.ReplItemsDone() = 0; + Y_ABORT_UNLESS(NextMinREALHugeBlobInBytes); + ReplCtx->MinREALHugeBlobInBytes = NextMinREALHugeBlobInBytes; UnrecoveredNonphantomBlobs = false; Become(&TThis::StateRepl); @@ -609,6 +616,7 @@ namespace NKikimr { cFunc(TEvBlobStorage::EvCommenceRepl, StartReplication) hFunc(TEvReplInvoke, Handle) hFunc(TEvReplCheckProgress, ReplProgressWatchdog) + hFunc(TEvMinHugeBlobSizeUpdate, Handle) ) void PassAway() override { @@ -652,6 +660,7 @@ namespace NKikimr { cFunc(TEvBlobStorage::EvCommenceRepl, Ignore) hFunc(TEvReplInvoke, Handle) hFunc(TEvReplCheckProgress, ReplProgressWatchdog) + hFunc(TEvMinHugeBlobSizeUpdate, Handle) ) public: @@ -662,6 +671,7 @@ namespace NKikimr { TReplScheduler(std::shared_ptr<TReplCtx> &replCtx) : TActorBootstrapped<TReplScheduler>() , ReplCtx(replCtx) + , NextMinREALHugeBlobInBytes(ReplCtx->MinREALHugeBlobInBytes) , History(HistorySize) , State(Relaxation) , ReplProgressWatchdog( diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h index 8795969ae13..a8ac714d52c 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h @@ -15,6 +15,7 @@ namespace NKikimr { TIntrusivePtr<THullCtx> HullCtx; TPDiskCtxPtr PDiskCtx; std::shared_ptr<THugeBlobCtx> HugeBlobCtx; + ui32 MinREALHugeBlobInBytes; TIntrusivePtr<THullDs> HullDs; TIntrusivePtr<TBlobStorageGroupInfo> GInfo; TActorId SkeletonId; @@ -30,6 +31,7 @@ namespace NKikimr { TIntrusivePtr<THullCtx> hullCtx, TPDiskCtxPtr pdiskCtx, std::shared_ptr<THugeBlobCtx> hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TIntrusivePtr<THullDs> hullDs, TIntrusivePtr<TBlobStorageGroupInfo> info, const TActorId &skeletonId, @@ -40,6 +42,7 @@ namespace NKikimr { , HullCtx(std::move(hullCtx)) , PDiskCtx(std::move(pdiskCtx)) , HugeBlobCtx(std::move(hugeBlobCtx)) + , MinREALHugeBlobInBytes(minREALHugeBlobInBytes) , HullDs(std::move(hullDs)) , GInfo(std::move(info)) , SkeletonId(skeletonId) @@ -47,7 +50,9 @@ namespace NKikimr { , VDiskCfg(std::move(vdiskCfg)) , PDiskWriteBytes(std::move(pdiskWriteBytes)) , PausedAtStart(pausedAtStart) - {} + { + Y_ABORT_UNLESS(MinREALHugeBlobInBytes); + } bool GetAddHeader() const { return !HullCtx || HullCtx->AddHeader; } }; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h index ea41a013e12..2c8e20602d8 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h @@ -204,7 +204,7 @@ namespace NKikimr { partsSize += partSize; TRope& data = item.Parts[i]; Y_ABORT_UNLESS(data.GetSize() == partSize); - if (ReplCtx->HugeBlobCtx->IsHugeBlob(groupType, id)) { + if (ReplCtx->HugeBlobCtx->IsHugeBlob(groupType, id, ReplCtx->MinREALHugeBlobInBytes)) { AddBlobToQueue(partId, TDiskBlob::Create(id.BlobSize(), i + 1, groupType.TotalPartCount(), std::move(data), Arena, ReplCtx->GetAddHeader()), {}, true, rbq); ++numHuge; @@ -378,7 +378,7 @@ namespace NKikimr { void RecoverMetadata(const TLogoBlobID& id, TRecoveredBlobsQueue& rbq) { while (!MetadataParts.empty() && MetadataParts.front().FullID() <= id) { const TLogoBlobID id = MetadataParts.front(); - const bool isHugeBlob = ReplCtx->HugeBlobCtx->IsHugeBlob(ReplCtx->VCtx->Top->GType, id.FullID()); + const bool isHugeBlob = ReplCtx->HugeBlobCtx->IsHugeBlob(ReplCtx->VCtx->Top->GType, id.FullID(), ReplCtx->MinREALHugeBlobInBytes); MetadataParts.pop_front(); STLOG(PRI_DEBUG, BS_REPL, BSVR30, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "TRecoveryMachine::RecoverMetadata"), (BlobId, id)); diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp index c5b434eb774..fcc5e1c923e 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp @@ -66,12 +66,13 @@ namespace NKikimr { auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); auto vctx = MakeIntrusive<TVDiskContext>(TActorId(), info->PickTopology(), counters, TVDiskID(0, 1, 0, 0, 0), nullptr, NPDisk::DEVICE_TYPE_UNKNOWN); - auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(512u << 10u, 4096u, nullptr, true); + auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(nullptr, true); auto replCtx = std::make_shared<TReplCtx>( vctx, nullptr, // HullCtx nullptr, // PDiskCtx hugeBlobCtx, + 4097, nullptr, info, TActorId(), diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 2285d725101..632faf21a80 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -44,6 +44,8 @@ #include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h> #include <ydb/core/blobstorage/vdisk/defrag/defrag_actor.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_internal_interface.h> +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/cms/console/console.h> #include <ydb/core/protos/node_whiteboard.pb.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <library/cpp/monlib/service/pages/templates.h> @@ -173,6 +175,41 @@ namespace NKikimr { } } + void Handle(const NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr &ev, const TActorContext &ctx) { + auto& record = ev->Get()->Record; + ctx.Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(record), 0, ev->Cookie); + if (!record.HasConfig() ) { + return; + } + if (const auto& config = record.GetConfig(); config.HasBlobStorageConfig() && config.GetBlobStorageConfig().HasVDiskPerformanceSettings()) { + for (auto &type : config.GetBlobStorageConfig().GetVDiskPerformanceSettings().GetVDiskTypes()) { + if (!type.HasPDiskType() || Config->BaseInfo.DeviceType != PDiskTypeToPDiskType(type.GetPDiskType())) { + continue; + } + if (!type.HasMinHugeBlobSizeInBytes()) { + continue; + } + if (!ApplyHugeBlobSize(type.GetMinHugeBlobSizeInBytes())) { + continue; + } + if (Config->RunRepl) { + ctx.Send(Db->ReplID, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes)); + } + ctx.Send(*SkeletonFrontIDPtr, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes)); + } + } + } + + bool ApplyHugeBlobSize(ui32 minHugeBlobInBytes) { + ui32 alignedSize = HugeBlobCtx->HugeSlotsMap->AlignByBlockSize(minHugeBlobInBytes) + 1; + if (MinREALHugeBlobInBytes == alignedSize) { + return false; + } + MinREALHugeBlobInBytes = alignedSize; + IFaceMonGroup->MinHugeBlobInBytes(MinREALHugeBlobInBytes); + return true; + } + //////////////////////////////////////////////////////////////////////// // SEND REPLY //////////////////////////////////////////////////////////////////////// @@ -511,7 +548,7 @@ namespace NKikimr { TVPutInfo &info = putsInfo.back(); try { - info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, blobId.FullID()); + info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, blobId.FullID(), MinREALHugeBlobInBytes); if (info.IsHugeBlob) { LOG_CRIT_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << "TEvVMultiPut: TEvVMultiPut has huge blob# " << blobId << " Marker# BSVS08"); @@ -654,7 +691,7 @@ namespace NKikimr { const ui64 bufSize = info.Buffer.GetSize(); try { - info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID()); + info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID(), MinREALHugeBlobInBytes); } catch (yexception ex) { LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS41"); info.HullStatus = {NKikimrProto::ERROR, 0, false}; @@ -1630,8 +1667,8 @@ namespace NKikimr { TRope buf = std::move(msg->Data); const ui64 bufSize = buf.GetSize(); - Y_ABORT_UNLESS(bufSize <= Config->MaxLogoBlobDataSize && HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID()), - "TEvRecoveredHugeBlob: blob is too small/huge bufSize# %zu", bufSize); + Y_ABORT_UNLESS(bufSize <= Config->MaxLogoBlobDataSize, + "TEvRecoveredHugeBlob: blob is huge bufSize# %zu", bufSize); UpdatePDiskWriteBytes(bufSize); auto oosStatus = VCtx->GetOutOfSpaceState().GetGlobalStatusFlags(); @@ -1716,9 +1753,14 @@ namespace NKikimr { auto msg = std::make_unique<TEvFrontRecoveryStatus>(TEvFrontRecoveryStatus::SyncGuidRecoveryDone, NKikimrProto::OK, (PDiskCtx ? PDiskCtx->Dsk : nullptr), - HugeBlobCtx, + MinREALHugeBlobInBytes, Db->GetVDiskIncarnationGuid()); ctx.Send(*SkeletonFrontIDPtr, msg.release()); + ctx.Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest( + NKikimrConsole::TConfigItem::BlobStorageConfigItem, + SelfId() + )); + Hull->PermitGarbageCollection(ctx); // propagate status to Node Warden unless replication is on -- in that case it sets the status itself if (!runRepl) { @@ -1738,7 +1780,7 @@ namespace NKikimr { auto msg = std::make_unique<TEvFrontRecoveryStatus>(phase, NKikimrProto::ERROR, (PDiskCtx ? PDiskCtx->Dsk : nullptr), - HugeBlobCtx, + MinREALHugeBlobInBytes, Db->GetVDiskIncarnationGuid()); ctx.Send(*SkeletonFrontIDPtr, msg.release()); // push the status @@ -1785,7 +1827,7 @@ namespace NKikimr { // check status if (ev->Get()->Status == NKikimrProto::OK) { - IFaceMonGroup->MinHugeBlobInBytes(HugeBlobCtx->MinREALHugeBlobInBytes); + ApplyHugeBlobSize(Config->MinHugeBlobInBytes); // handle special case when donor disk starts and finds out that it has been wiped out if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) { // send drop donor cmd to NodeWarden @@ -1802,7 +1844,7 @@ namespace NKikimr { auto msg = std::make_unique<TEvFrontRecoveryStatus>(TEvFrontRecoveryStatus::LocalRecoveryDone, NKikimrProto::OK, PDiskCtx->Dsk, - HugeBlobCtx, + MinREALHugeBlobInBytes, Db->GetVDiskIncarnationGuid()); ctx.Send(*SkeletonFrontIDPtr, msg.release()); @@ -1965,7 +2007,7 @@ namespace NKikimr { DbBirthLsn = ev->Get()->DbBirthLsn; SkeletonIsUpAndRunning(ctx, Config->RunRepl); if (Config->RunRepl) { - auto replCtx = std::make_shared<TReplCtx>(VCtx, HullCtx, PDiskCtx, HugeBlobCtx, Hull->GetHullDs(), + auto replCtx = std::make_shared<TReplCtx>(VCtx, HullCtx, PDiskCtx, HugeBlobCtx, MinREALHugeBlobInBytes, Hull->GetHullDs(), GInfo, SelfId(), Config, PDiskWriteBytes, Config->ReplPausedAtStart); Db->ReplID.Set(ctx.Register(CreateReplActor(replCtx))); ActiveActors.Insert(Db->ReplID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); // keep forever @@ -2353,6 +2395,13 @@ namespace NKikimr { TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvCommenceRepl, 0, Db->ReplID, SelfId(), nullptr, 0)); } } + + void PassAway() override { + Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest( + SelfId() + )); + TActor::PassAway(); + } void ForwardToScrubActor(STFUNC_SIG) { Forward(ev, ScrubId); @@ -2595,6 +2644,8 @@ namespace NKikimr { ) STRICT_STFUNC(StateNormal, + IgnoreFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse); + HFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle); HFunc(TEvBlobStorage::TEvVMovedPatch, Handle) HFunc(TEvBlobStorage::TEvVPatchStart, Handle) HFunc(TEvBlobStorage::TEvVPatchDiff, HandleVPatchDiffResending) @@ -2741,6 +2792,7 @@ namespace NKikimr { THullCtxPtr HullCtx; THugeBlobCtxPtr HugeBlobCtx; std::shared_ptr<THullLogCtx> HullLogCtx; + ui32 MinREALHugeBlobInBytes; std::shared_ptr<THull> Hull; // run it after local recovery std::shared_ptr<TOutOfSpaceLogic> OutOfSpaceLogic; std::shared_ptr<TQueryCtx> QueryCtx; diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 49e439a054a..c2a83310e96 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -41,12 +41,12 @@ namespace NKikimr { TEvFrontRecoveryStatus::TEvFrontRecoveryStatus(EPhase phase, NKikimrProto::EReplyStatus status, const TIntrusivePtr<TPDiskParams> &dsk, - std::shared_ptr<THugeBlobCtx> hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TVDiskIncarnationGuid vdiskIncarnationGuid) : Phase(phase) , Status(status) , Dsk(dsk) - , HugeBlobCtx(std::move(hugeBlobCtx)) + , MinREALHugeBlobInBytes(minREALHugeBlobInBytes) , VDiskIncarnationGuid(vdiskIncarnationGuid) {} @@ -819,7 +819,7 @@ namespace NKikimr { TBlobStorageGroupType type = (GInfo ? GInfo->Type : TErasureType::ErasureNone); VCtx->UpdateCostModel(std::make_unique<TCostModel>(msg->Dsk->SeekTimeUs, msg->Dsk->ReadSpeedBps, msg->Dsk->WriteSpeedBps, msg->Dsk->ReadBlockSize, msg->Dsk->WriteBlockSize, - msg->HugeBlobCtx->MinREALHugeBlobInBytes, type)); + msg->MinREALHugeBlobInBytes, type)); break; } case TEvFrontRecoveryStatus::SyncGuidRecoveryDone: @@ -831,6 +831,12 @@ namespace NKikimr { } } + void Handle(TEvMinHugeBlobSizeUpdate::TPtr &ev) { + VCtx->UpdateCostModel(std::make_unique<TCostModel>(VCtx->CostModel->SeekTimeUs, VCtx->CostModel->ReadSpeedBps, + VCtx->CostModel->WriteSpeedBps, VCtx->CostModel->ReadBlockSize, VCtx->CostModel->WriteBlockSize, + ev->Get()->MinREALHugeBlobInBytes, VCtx->CostModel->GType)); + } + static NKikimrWhiteboard::EFlag ToLightSignal(NKikimrWhiteboard::EVDiskState st) { switch (st) { case NKikimrWhiteboard::EVDiskState::Initial: return NKikimrWhiteboard::EFlag::Yellow; @@ -2063,6 +2069,7 @@ namespace NKikimr { HFunc(TEvReportScrubStatus, Handle) HFunc(NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate, Handle) fFunc(TEvBlobStorage::EvForwardToSkeleton, HandleForwardToSkeleton) + hFunc(TEvMinHugeBlobSizeUpdate, Handle) ) #define HFuncStatus(TEvType, status, errorReason, now, wstatus) \ diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h index b5ef914ef06..d5a7c13ad6d 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h @@ -10,7 +10,6 @@ namespace NKikimr { struct TPDiskParams; class TBlobStorageGroupInfo; - class THugeBlobCtx; //////////////////////////////////////////////////////////////////////////// // TEvFrontRecoveryStatus @@ -25,13 +24,13 @@ namespace NKikimr { const EPhase Phase; const NKikimrProto::EReplyStatus Status; const TIntrusivePtr<TPDiskParams> Dsk; - const std::shared_ptr<THugeBlobCtx> HugeBlobCtx; + ui32 MinREALHugeBlobInBytes; const TVDiskIncarnationGuid VDiskIncarnationGuid; TEvFrontRecoveryStatus(EPhase phase, NKikimrProto::EReplyStatus status, const TIntrusivePtr<TPDiskParams> &dsk, - std::shared_ptr<THugeBlobCtx> hugeBlobCtx, + ui32 MinREALHugeBlobInBytes, TVDiskIncarnationGuid vdiskIncarnationGuid); ~TEvFrontRecoveryStatus(); }; diff --git a/ydb/core/cms/console/configs_dispatcher.cpp b/ydb/core/cms/console/configs_dispatcher.cpp index 395f70fb88d..be2c90555e5 100644 --- a/ydb/core/cms/console/configs_dispatcher.cpp +++ b/ydb/core/cms/console/configs_dispatcher.cpp @@ -64,6 +64,7 @@ const THashSet<ui32> DYNAMIC_KINDS({ (ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem, (ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem, (ui32)NKikimrConsole::TConfigItem::TracingConfigItem, + (ui32)NKikimrConsole::TConfigItem::BlobStorageConfigItem, }); const THashSet<ui32> NON_YAML_KINDS({ |