aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Berlin <aberlin@ydb.tech>2024-04-26 18:52:06 +0300
committerGitHub <noreply@github.com>2024-04-26 18:52:06 +0300
commitdb1b29e584d1ffb699262c4895caac9be615d6fa (patch)
tree9816285850868b7e22b73577526edb5fb6bdab9b
parenta1f11c9e7c3cf5a1643013f13142a71f112d32e3 (diff)
downloadydb-db1b29e584d1ffb699262c4895caac9be615d6fa.tar.gz
Apply new min huge blob size at runtime (#4016)
-rw-r--r--ydb/core/base/blobstorage.h1
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/defs.h2
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/env.h68
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp119
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make1
-rw-r--r--ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp2
-rw-r--r--ydb/core/blobstorage/ut_vdisk/gen_restarts.h18
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/setup.h9
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp1
-rw-r--r--ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp38
-rw-r--r--ydb/core/blobstorage/ut_vdisk2/defs.h2
-rw-r--r--ydb/core/blobstorage/ut_vdisk2/env.h66
-rw-r--r--ydb/core/blobstorage/ut_vdisk2/huge.cpp15
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h10
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h10
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h6
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h7
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h4
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp70
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp13
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h5
-rw-r--r--ydb/core/cms/console/configs_dispatcher.cpp1
28 files changed, 463 insertions, 35 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 2a9dcf6a6ca..143f8d7169d 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -708,6 +708,7 @@ struct TEvBlobStorage {
EvReplInvoke,
EvStartBalancing,
EvReplCheckProgress,
+ EvMinHugeBlobSizeUpdate,
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
index f77ce94bb66..8a531e14a92 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
@@ -2,6 +2,8 @@
#include <ydb/core/base/hive.h>
#include <ydb/core/blob_depot/blob_depot.h>
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/cms/console/console.h>
#include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h>
#include <ydb/core/blobstorage/dsproxy/mock/model.h>
#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h>
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
index c71ac3c1e6f..98bf955f42b 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
@@ -40,7 +40,9 @@ struct TEnvironmentSetup {
const TFeatureFlags FeatureFlags;
const NPDisk::EDeviceType DiskType = NPDisk::EDeviceType::DEVICE_TYPE_NVME;
const ui32 BurstThresholdNs = 0;
+ const ui32 MinHugeBlobInBytes = 0;
const float DiskTimeAvailableScale = 1;
+ const bool UseFakeConfigDispatcher = false;
};
const TSettings Settings;
@@ -68,6 +70,51 @@ struct TEnvironmentSetup {
}
};
+
+ class TFakeConfigDispatcher : public TActor<TFakeConfigDispatcher> {
+ std::unordered_set<TActorId> Subscribers;
+ TActorId EdgeId;
+ public:
+ TFakeConfigDispatcher()
+ : TActor<TFakeConfigDispatcher>(&TFakeConfigDispatcher::StateWork)
+ {
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle);
+ hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle)
+ hFunc(NConsole::TEvConsole::TEvConfigNotificationResponse, Handle)
+ hFunc(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle)
+ }
+ }
+
+ void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest::TPtr& ev) {
+ auto& items = ev->Get()->ConfigItemKinds;
+ if (items.at(0) != NKikimrConsole::TConfigItem::BlobStorageConfigItem) {
+ return;
+ }
+ Subscribers.emplace(ev->Sender);
+ }
+
+ void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
+ EdgeId = ev->Sender;
+ for (auto& id : Subscribers) {
+ auto update = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>();
+ update->Record.CopyFrom(ev->Get()->Record);
+ Send(id, update.Release());
+ }
+ }
+
+ void Handle(NConsole::TEvConsole::TEvConfigNotificationResponse::TPtr& ev) {
+ Forward(ev, EdgeId);
+ }
+
+ void Handle(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr& ev) {
+ Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release());
+ }
+ };
+
TEnvironmentSetup(bool vdiskReplPausedAtStart, TBlobStorageGroupType erasure = TBlobStorageGroupType::ErasureNone)
: TEnvironmentSetup(TSettings{
.VDiskReplPausedAtStart = vdiskReplPausedAtStart,
@@ -336,11 +383,22 @@ struct TEnvironmentSetup {
type->SetDiskTimeAvailableScale(Settings.DiskTimeAvailableScale);
}
+ {
+ auto* type = config->BlobStorageConfig.MutableVDiskPerformanceSettings()->AddVDiskTypes();
+ type->SetPDiskType(PDiskTypeToPDiskType(Settings.DiskType));
+ if (Settings.MinHugeBlobInBytes) {
+ type->SetMinHugeBlobSizeInBytes(Settings.MinHugeBlobInBytes);
+ }
+ }
+
warden.reset(CreateBSNodeWarden(config));
}
const TActorId wardenId = Runtime->Register(warden.release(), nodeId);
Runtime->RegisterService(MakeBlobStorageNodeWardenID(nodeId), wardenId);
+ if (Settings.UseFakeConfigDispatcher) {
+ Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(nodeId), Runtime->Register(new TFakeConfigDispatcher(), nodeId));
+ }
}
}
@@ -641,6 +699,16 @@ struct TEnvironmentSetup {
});
}
+ void PutBlob(const ui32 groupId, const TLogoBlobID& blobId, const TString& part) {
+ TActorId edge = Runtime->AllocateEdgeActor(Settings.ControllerNodeId);
+ Runtime->WrapInActorContext(edge, [&] {
+ SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(blobId, part, TInstant::Max(),
+ NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticMaxThroughput));
+ });
+ auto res = WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(edge);
+ Y_ABORT_UNLESS(res->Get()->Status == NKikimrProto::OK);
+ }
+
void CommenceReplication() {
for (ui32 groupId : GetGroups()) {
auto info = GetGroupInfo(groupId);
diff --git a/ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp b/ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp
new file mode 100644
index 00000000000..5f2124dce5d
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/replication_huge.cpp
@@ -0,0 +1,119 @@
+#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
+#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
+#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h>
+#include <util/system/info.h>
+
+
+enum class EState {
+ OK,
+ FORMAT
+};
+
+struct TReplTestSettings {
+ ui32 BlobSize;
+ ui32 MinHugeBlobSize;
+ ui32 MinHugeBlobSizeInRepl;
+};
+
+void DoTestCase(const TReplTestSettings& settings) {
+ using E = EState;
+ std::vector<EState> states = {E::OK, E::FORMAT, E::FORMAT, E::OK, E::OK, E::OK, E::OK, E::OK};
+ ui32 nodeCount = states.size();
+ TEnvironmentSetup env(TEnvironmentSetup::TSettings{
+ .NodeCount = nodeCount,
+ .Erasure = TBlobStorageGroupType::EErasureSpecies::Erasure4Plus2Block,
+ .ControllerNodeId = 1,
+ .MinHugeBlobInBytes = settings.MinHugeBlobSize,
+ .UseFakeConfigDispatcher = true,
+ });
+
+ env.CreateBoxAndPool(1, 1);
+ env.Sim(TDuration::Minutes(1));
+
+ auto groupId = env.GetGroups()[0];
+ TString data = TString::Uninitialized(settings.BlobSize);
+ memset(data.Detach(), 1, data.size());
+ TLogoBlobID id(1, 1, 1, 0, data.size(), 0);
+ env.PutBlob(groupId, id, data);
+ env.WaitForSync(env.GetGroupInfo(groupId), id);
+
+ auto checkBlob = [&] {
+ TActorId edge = env.Runtime->AllocateEdgeActor(env.Settings.ControllerNodeId);
+ env.Runtime->WrapInActorContext(edge, [&] {
+ SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
+ NKikimrBlobStorage::EGetHandleClass::FastRead));
+ });
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(edge);
+ auto *msg = res->Get();
+ Y_ABORT_UNLESS(msg->ResponseSz == 1);
+ return msg->Responses[0].Status;
+ };
+
+ Y_ABORT_UNLESS(checkBlob() == NKikimrProto::OK);
+ env.Cleanup();
+
+ for (auto& [key, state] : env.PDiskMockStates) {
+ if (states[key.first - 1] == E::FORMAT) {
+ state.Reset();
+ }
+ }
+
+ env.Initialize();
+
+ std::vector<std::pair<ui32, std::unique_ptr<IEventHandle>>> detainedMsgs;
+ env.Runtime->FilterFunction = [&](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == TEvBlobStorage::EvReplStarted) {
+ detainedMsgs.emplace_back(nodeId, std::move(ev));
+ return false;
+ }
+ return true;
+ };
+
+ env.Sim(TDuration::Minutes(10));
+ Y_ABORT_UNLESS(checkBlob() == NKikimrProto::OK);
+ Y_ABORT_IF(detainedMsgs.empty());
+
+ env.Runtime->FilterFunction = {};
+
+ // replication is about to start, updating the minHugeBlobSize in skeleton and resuming replication
+ for (auto& [nodeId, detainedEv] : detainedMsgs) {
+ TActorId edge = env.Runtime->AllocateEdgeActor(nodeId);
+
+ auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>();
+ auto perfConfig = NKikimrConfig::TBlobStorageConfig_TVDiskPerformanceConfig();
+ perfConfig.SetPDiskType(PDiskTypeToPDiskType(env.Settings.DiskType));
+ perfConfig.SetMinHugeBlobSizeInBytes(settings.MinHugeBlobSizeInRepl);
+ auto* vdiskTypes = request->Record.MutableConfig()->MutableBlobStorageConfig()->MutableVDiskPerformanceSettings()->MutableVDiskTypes();
+ vdiskTypes->Add(std::move(perfConfig));
+
+ env.Runtime->Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(nodeId), edge, request.Release()), nodeId);
+ Y_ABORT_UNLESS(env.Runtime->WaitForEdgeActorEvent({edge})->CastAsLocal<NConsole::TEvConsole::TEvConfigNotificationResponse>());
+
+ env.Runtime->Send(detainedEv.release(), nodeId);
+ }
+
+ // waiting for replication to complete
+ env.WaitForSync(env.GetGroupInfo(groupId), id);
+
+ UNIT_ASSERT_EQUAL(checkBlob(), NKikimrProto::OK);
+}
+
+Y_UNIT_TEST_SUITE(MinHugeChangeOnReplication) {
+
+ Y_UNIT_TEST(MinHugeDecreased) {
+ DoTestCase(TReplTestSettings{
+ .BlobSize = 200u << 10,
+ .MinHugeBlobSize = 64u << 10,
+ .MinHugeBlobSizeInRepl = 512u << 10,
+ });
+ }
+
+ Y_UNIT_TEST(MinHugeIncreased) {
+ DoTestCase(TReplTestSettings{
+ .BlobSize = 200u << 10,
+ .MinHugeBlobSize = 512u << 10,
+ .MinHugeBlobSizeInRepl = 10u << 10,
+ });
+ }
+
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make
index ae6c2101bc1..bd564dba019 100644
--- a/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_replication/ya.make
@@ -10,6 +10,7 @@ TAG(ya:fat)
SRCS(
replication.cpp
+ replication_huge.cpp
)
PEERDIR(
diff --git a/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp b/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp
index da5ea2e1604..d32b7bfa7ad 100644
--- a/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/gen_restarts.cpp
@@ -49,7 +49,7 @@ void ChaoticWriteRestartWrite(const TChaoticWriteRestartWriteSettings &settings,
UNIT_ASSERT(success1);
Conf.Shutdown();
- Conf.Prepare(settings.WriteRunSetup.get(), false);
+ Conf.Prepare(settings.SecondWriteRunSetup.get(), false);
auto cls2 = std::make_shared<TPutHandleClassGenerator>(settings.Cls);
TChaoticManyPutsTest x(settings.Parallel, 1, settings.MsgSize, cls2, settings.WorkingTime,
settings.RequestTimeout);
diff --git a/ydb/core/blobstorage/ut_vdisk/gen_restarts.h b/ydb/core/blobstorage/ut_vdisk/gen_restarts.h
index 4f3e170c0de..70547e403f9 100644
--- a/ydb/core/blobstorage/ut_vdisk/gen_restarts.h
+++ b/ydb/core/blobstorage/ut_vdisk/gen_restarts.h
@@ -79,21 +79,37 @@ struct TMultiPutWriteRestartReadSettings {
};
struct TChaoticWriteRestartWriteSettings : public TWriteRestartReadSettings {
- // number of parallel writes/reads (ignored if just single read/write)
+ // number of parallel writes/reads (ignored if just single read/write)]
+ const std::shared_ptr<IVDiskSetup> SecondWriteRunSetup;
const ui32 Parallel;
const TDuration WorkingTime;
const TDuration RequestTimeout;
TChaoticWriteRestartWriteSettings(
const TWriteRestartReadSettings &baseSettings,
+ std::shared_ptr<IVDiskSetup> secondWriteRunSetup,
ui32 parallel,
TDuration workingTime,
TDuration requestTimeout)
: TWriteRestartReadSettings(baseSettings)
+ , SecondWriteRunSetup(secondWriteRunSetup)
, Parallel(parallel)
, WorkingTime(workingTime)
, RequestTimeout(requestTimeout)
{}
+
+ TChaoticWriteRestartWriteSettings(
+ const TWriteRestartReadSettings &baseSettings,
+ ui32 parallel,
+ TDuration workingTime,
+ TDuration requestTimeout)
+ : TWriteRestartReadSettings(baseSettings)
+ , SecondWriteRunSetup(baseSettings.WriteRunSetup)
+ , Parallel(parallel)
+ , WorkingTime(workingTime)
+ , RequestTimeout(requestTimeout)
+ {}
+
};
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/setup.h b/ydb/core/blobstorage/ut_vdisk/lib/setup.h
index 5b7dd9089e2..cc4a2e2da2f 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/setup.h
+++ b/ydb/core/blobstorage/ut_vdisk/lib/setup.h
@@ -24,6 +24,15 @@ struct TFastVDiskSetup : public TDefaultVDiskSetup {
}
};
+struct TFastVDiskSetupMinHugeBlob : public TFastVDiskSetup {
+ TFastVDiskSetupMinHugeBlob(ui32 minHugeBlobInBytes) {
+ auto modifier = [=] (NKikimr::TVDiskConfig *cfg) {
+ cfg->MinHugeBlobInBytes = minHugeBlobInBytes;
+ };
+ AddConfigModifier(modifier);
+ }
+};
+
struct TFastVDiskSetupSmallVDiskQueues : public TFastVDiskSetup {
TFastVDiskSetupSmallVDiskQueues() {
auto modifier = [] (NKikimr::TVDiskConfig *cfg) {
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp b/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp
index 184cfba946e..518c675b8af 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp
@@ -206,6 +206,7 @@ private:
nullptr,
nullptr, // PDiskCtx
nullptr, // HugeBlobCtx
+ 4097,
nullptr,
MakeIntrusive<TBlobStorageGroupInfo>(groupInfo),
ctx.SelfID,
diff --git a/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp b/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp
index 8ec7fb3acef..422b139e9fd 100644
--- a/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/vdisk_test.cpp
@@ -471,6 +471,20 @@ Y_UNIT_TEST_SUITE(TBsLocalRecovery) {
WriteRestartRead(settings, TIMEOUT);
}
+ Y_UNIT_TEST(WriteRestartReadHugeIncreased) {
+ auto vdiskWriteSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u);
+ auto vdiskReadSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60u << 10u);
+ auto settings = TWriteRestartReadSettings(1000, 20u << 10u, HUGEB, vdiskWriteSetup, vdiskReadSetup);
+ WriteRestartRead(settings, TIMEOUT);
+ }
+
+ Y_UNIT_TEST(WriteRestartReadHugeDecreased) {
+ auto vdiskWriteSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60 << 10u);
+ auto vdiskReadSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u);
+ auto settings = TWriteRestartReadSettings(1000, 20u << 10u, HUGEB, vdiskWriteSetup, vdiskReadSetup);
+ WriteRestartRead(settings, TIMEOUT);
+ }
+
Y_UNIT_TEST(MultiPutWriteRestartRead) {
auto vdiskSetup = std::make_shared<TFastVDiskSetup>();
auto settings = TMultiPutWriteRestartReadSettings::OneSetup(1000, 10, 10, UNK, vdiskSetup);
@@ -547,6 +561,30 @@ Y_UNIT_TEST_SUITE(TBsLocalRecovery) {
TDuration::Seconds(0));
ChaoticWriteRestartWrite(settings, TIMEOUT);
}
+
+ Y_UNIT_TEST(ChaoticWriteRestartHugeIncreased) {
+ auto vdiskSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u);
+ auto vdiskSecondSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60u << 10u);
+ TChaoticWriteRestartWriteSettings settings(
+ TWriteRestartReadSettings::OneSetup(300, 20u << 10u, HUGEB, vdiskSetup),
+ vdiskSecondSetup,
+ 500,
+ TDuration::Seconds(10),
+ TDuration::Seconds(0));
+ ChaoticWriteRestartWrite(settings, TIMEOUT);
+ }
+
+ Y_UNIT_TEST(ChaoticWriteRestartHugeDecreased) {
+ auto vdiskSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(60u << 10u);
+ auto vdiskSecondSetup = std::make_shared<TFastVDiskSetupMinHugeBlob>(8u << 10u);
+ TChaoticWriteRestartWriteSettings settings(
+ TWriteRestartReadSettings::OneSetup(300, 20u << 10u, HUGEB, vdiskSetup),
+ vdiskSecondSetup,
+ 500,
+ TDuration::Seconds(10),
+ TDuration::Seconds(0));
+ ChaoticWriteRestartWrite(settings, TIMEOUT);
+ }
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/ut_vdisk2/defs.h b/ydb/core/blobstorage/ut_vdisk2/defs.h
index 13e976d338c..5359b1892e1 100644
--- a/ydb/core/blobstorage/ut_vdisk2/defs.h
+++ b/ydb/core/blobstorage/ut_vdisk2/defs.h
@@ -5,5 +5,7 @@
#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_config.h>
#include <ydb/core/blobstorage/vdisk/vdisk_actor.h>
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/cms/console/console.h>
#include <ydb/core/util/testactorsys.h>
#include <library/cpp/testing/unittest/registar.h>
diff --git a/ydb/core/blobstorage/ut_vdisk2/env.h b/ydb/core/blobstorage/ut_vdisk2/env.h
index f1019a87408..e15da631531 100644
--- a/ydb/core/blobstorage/ut_vdisk2/env.h
+++ b/ydb/core/blobstorage/ut_vdisk2/env.h
@@ -21,6 +21,50 @@ namespace NKikimr {
TIntrusivePtr<TPDiskMockState> PDiskMockState;
std::unordered_map<NKikimrBlobStorage::EVDiskQueueId, TActorId> QueueIds;
+ class TFakeConfigDispatcher : public TActor<TFakeConfigDispatcher> {
+ std::unordered_set<TActorId> Subscribers;
+ TActorId EdgeId;
+ public:
+ TFakeConfigDispatcher()
+ : TActor<TFakeConfigDispatcher>(&TFakeConfigDispatcher::StateWork)
+ {
+ }
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle);
+ hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle)
+ hFunc(NConsole::TEvConsole::TEvConfigNotificationResponse, Handle)
+ hFunc(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle)
+ }
+ }
+
+ void Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest::TPtr& ev) {
+ auto& items = ev->Get()->ConfigItemKinds;
+ if (items.at(0) != NKikimrConsole::TConfigItem::BlobStorageConfigItem) {
+ return;
+ }
+ Subscribers.emplace(ev->Sender);
+ }
+
+ void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
+ EdgeId = ev->Sender;
+ for (auto& id : Subscribers) {
+ auto update = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>();
+ update->Record.CopyFrom(ev->Get()->Record);
+ Send(id, update.Release());
+ }
+ }
+
+ void Handle(NConsole::TEvConsole::TEvConfigNotificationResponse::TPtr& ev) {
+ Forward(ev, EdgeId);
+ }
+
+ void Handle(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr& ev) {
+ Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release());
+ }
+ };
+
public:
TTestEnv(TIntrusivePtr<TPDiskMockState> state = nullptr)
: Runtime(std::make_unique<TTestActorSystem>(1))
@@ -31,6 +75,7 @@ namespace NKikimr {
SetupLogging();
Runtime->Start();
CreatePDisk();
+ CreateConfigDispatcher();
CreateVDisk();
CreateQueues();
}
@@ -78,6 +123,23 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId::PutTabletLog);
}
+ void ChangeMinHugeBlobSize(ui32 minHugeBlobSize) {
+ const TActorId& edge = Runtime->AllocateEdgeActor(NodeId);
+ auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>();
+ auto perfConfig = NKikimrConfig::TBlobStorageConfig_TVDiskPerformanceConfig();
+ perfConfig.SetPDiskType(PDiskTypeToPDiskType(VDiskConfig->BaseInfo.DeviceType));
+ perfConfig.SetMinHugeBlobSizeInBytes(minHugeBlobSize);
+
+ auto* vdiskTypes = request->Record.MutableConfig()->MutableBlobStorageConfig()->MutableVDiskPerformanceSettings()->MutableVDiskTypes();
+ vdiskTypes->Add(std::move(perfConfig));
+
+ Runtime->Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(NodeId), edge, request.Release()), NodeId);
+ auto ev = Runtime->WaitForEdgeActorEvent({edge});
+ Runtime->DestroyActor(edge);
+ auto *msg = ev->CastAsLocal<NConsole::TEvConsole::TEvConfigNotificationResponse>();
+ UNIT_ASSERT(msg);
+ }
+
private:
template<typename TEvVResult>
decltype(std::declval<TEvVResult>().Record) ExecuteQuery(std::unique_ptr<IEventBase> query,
@@ -126,6 +188,10 @@ namespace NKikimr {
}
}
+ void CreateConfigDispatcher() {
+ Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(NodeId), Runtime->Register(new TFakeConfigDispatcher(), NodeId));
+ }
+
static NKikimrBlobStorage::EVDiskQueueId GetQueueId(NKikimrBlobStorage::EPutHandleClass prio) {
switch (prio) {
case NKikimrBlobStorage::EPutHandleClass::TabletLog:
diff --git a/ydb/core/blobstorage/ut_vdisk2/huge.cpp b/ydb/core/blobstorage/ut_vdisk2/huge.cpp
index 83fd45e3358..153a2a8e366 100644
--- a/ydb/core/blobstorage/ut_vdisk2/huge.cpp
+++ b/ydb/core/blobstorage/ut_vdisk2/huge.cpp
@@ -13,7 +13,9 @@ Y_UNIT_TEST_SUITE(VDiskTest) {
char value = 1;
std::vector<TString> blobValues;
- for (const ui32 size : {10, 1024, 576 * 1024, 1024 * 1024, 1536 * 1024}) {
+ std::vector<ui32> minHugeBlobValues = {8 * 1024, 12 * 1024, 60 * 1024, 64 * 1024, 512 * 1024};
+
+ for (const ui32 size : {10, 1024, 40 * 1024, 576 * 1024, 1024 * 1024, 1536 * 1024}) {
for (ui32 i = 0; i < 10; ++i) {
TString data = TString::Uninitialized(size);
memset(data.Detach(), value++, data.size());
@@ -53,6 +55,7 @@ Y_UNIT_TEST_SUITE(VDiskTest) {
ui64 minTotalSize = (ui64)4 << 30;
ui64 totalSize = 0;
ui8 channel = 0;
+ ui32 lastMinHugeBlobValue = 0;
while (TInstant::Now() < end) {
const ui64 tabletId = tabletIds[RandomNumber(tabletIds.size())];
@@ -68,6 +71,16 @@ Y_UNIT_TEST_SUITE(VDiskTest) {
content.emplace(id, &data);
totalSize += data.size();
+ if (RandomNumber(1000u) < 3) {
+ ui32 minHugeBlobValue;
+ do {
+ minHugeBlobValue = minHugeBlobValues[RandomNumber(minHugeBlobValues.size())];
+ } while (minHugeBlobValue == lastMinHugeBlobValue);
+ lastMinHugeBlobValue = minHugeBlobValue;
+ env->ChangeMinHugeBlobSize(minHugeBlobValue);
+ Cerr << "Change MinHugeBlobSize# " << minHugeBlobValue << Endl;
+ }
+
if (totalSize > maxTotalSize || (totalSize >= minTotalSize && RandomNumber(1000u) < 3)) {
std::vector<TLogoBlobID> options;
options.reserve(content.size());
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 1daaca12dea..b714f2199e2 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -3121,4 +3121,14 @@ namespace NKikimr {
struct TEvPermitGarbageCollection : TEventLocal<TEvPermitGarbageCollection, TEvBlobStorage::EvPermitGarbageCollection> {};
+ ////////////////////////////////////////////////////////////////////////////
+ // TEvMinHugeBlobSizeUpdate
+ ////////////////////////////////////////////////////////////////////////////
+ class TEvMinHugeBlobSizeUpdate : public TEventLocal<TEvMinHugeBlobSizeUpdate, TEvBlobStorage::EvMinHugeBlobSizeUpdate> {
+ public:
+ ui32 MinREALHugeBlobInBytes;
+
+ TEvMinHugeBlobSizeUpdate(ui32 minREALHugeBlobInBytes) : MinREALHugeBlobInBytes(minREALHugeBlobInBytes) {
+ };
+ };
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp
index 89bb2d018ea..f8d4b2fec47 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp
@@ -16,6 +16,12 @@ namespace NKikimr {
return &AllSlotsInfo.at(idx);
}
+ ui32 THugeSlotsMap::AlignByBlockSize(ui32 size) const {
+ ui32 sizeInBlocks = size / AppendBlockSize;
+ Y_ABORT_UNLESS(sizeInBlocks, "Blob size to align is smaller than a single block. BlobSize# %" PRIu32, size);
+ return sizeInBlocks * AppendBlockSize;
+ }
+
void THugeSlotsMap::Output(IOutputStream &str) const {
str << "{AllSlotsInfo# [\n";
for (const auto &x : AllSlotsInfo) {
@@ -42,8 +48,8 @@ namespace NKikimr {
}
// check whether this blob is huge one; userPartSize doesn't include any metadata stored along with blob
- bool THugeBlobCtx::IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const {
- return gtype.MaxPartSize(fullId) + (AddHeader ? TDiskBlob::HeaderSize : 0) >= MinREALHugeBlobInBytes;
+ bool THugeBlobCtx::IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, ui32 minREALHugeBlobInBytes) const {
+ return gtype.MaxPartSize(fullId) + (AddHeader ? TDiskBlob::HeaderSize : 0) >= minREALHugeBlobInBytes;
}
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
index 9045fce089e..1c08c0be927 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
@@ -49,6 +49,7 @@ namespace NKikimr {
THugeSlotsMap(ui32 appendBlockSize, TAllSlotsInfo &&slotsInfo, TSearchTable &&searchTable);
const TSlotInfo *GetSlotInfo(ui32 size) const;
+ ui32 AlignByBlockSize(ui32 size) const;
void Output(IOutputStream &str) const;
TString ToString() const;
@@ -64,19 +65,16 @@ namespace NKikimr {
class THugeBlobCtx {
public:
// this value is multiply of AppendBlockSize and is calculated from Config->MinHugeBlobSize
- const ui32 MinREALHugeBlobInBytes;
const std::shared_ptr<const THugeSlotsMap> HugeSlotsMap;
const bool AddHeader;
// check whether this NEW blob is huge one; userPartSize doesn't include any metadata stored along with blob
- bool IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const;
+ bool IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, ui32 minREALHugeBlobInBytes) const;
- THugeBlobCtx(ui32 minHugeBlobInBytes, ui32 appendBlockSize, const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap, bool addHeader)
- : MinREALHugeBlobInBytes(minHugeBlobInBytes / appendBlockSize * appendBlockSize + 1)
- , HugeSlotsMap(hugeSlotsMap)
+ THugeBlobCtx(const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap, bool addHeader)
+ : HugeSlotsMap(hugeSlotsMap)
, AddHeader(addHeader)
{
- Y_ABORT_UNLESS(MinREALHugeBlobInBytes >= appendBlockSize);
}
};
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
index d819080ca78..28fa9e6f37d 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_mongroups.h
@@ -520,7 +520,11 @@ public:
}
void MinHugeBlobInBytes(ui32 size) {
+ if (PrevMinHugeBlobInBytes) {
+ GroupCounters->GetNamedCounter("MinHugeBlobInBytes", ToString(PrevMinHugeBlobInBytes), false)->Dec();
+ }
GroupCounters->GetNamedCounter("MinHugeBlobInBytes", ToString(size), false)->Inc();
+ PrevMinHugeBlobInBytes = size;
}
COUNTER_DEF(MovedPatchMsgs);
@@ -564,6 +568,8 @@ public:
COUNTER_DEF(PutTotalBytes);
COUNTER_DEF(GetTotalBytes);
+ private:
+ ui32 PrevMinHugeBlobInBytes = 0;
};
///////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp
index f87c1a105b6..df27d118e98 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ctx_ut.cpp
@@ -35,8 +35,6 @@ namespace NKikimr {
logFunc);
return std::make_shared<THugeBlobCtx>(
- ChunkSize,
- AppendBlockSize,
repairedHuge->Heap->BuildHugeSlotsMap(),
true);
}
diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp
index cbdfb3f510a..0aa301c7e3e 100644
--- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp
+++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp
@@ -448,8 +448,6 @@ namespace NKikimr {
lsn, entryPoint, logFunc);
}
HugeBlobCtx = std::make_shared<THugeBlobCtx>(
- Config->MinHugeBlobInBytes,
- LocRecCtx->PDiskCtx->Dsk->AppendBlockSize,
LocRecCtx->RepairedHuge->Heap->BuildHugeSlotsMap(),
Config->AddHeader);
HugeKeeperInitialized = true;
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp
index 208672f6067..b1eeba13a82 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp
@@ -14,7 +14,7 @@ std::shared_ptr<TReplCtx> CreateReplCtx(TVector<TVDiskID>& vdisks, const TIntrus
auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
auto vctx = MakeIntrusive<TVDiskContext>(TActorId(), info->PickTopology(), counters, TVDiskID(0, 1, 0, 0, 0),
nullptr, NPDisk::DEVICE_TYPE_UNKNOWN);
- auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(512u << 10u, 4096u, nullptr, true);
+ auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(nullptr, true);
auto dsk = MakeIntrusive<TPDiskParams>(ui8(1), 1u, 128u << 20, 4096u, 0u, 1000000000u, 1000000000u, 65536u, 65536u, 65536u);
auto pdiskCtx = std::make_shared<TPDiskCtx>(dsk, TActorId(), TString());
auto replCtx = std::make_shared<TReplCtx>(
@@ -22,6 +22,7 @@ std::shared_ptr<TReplCtx> CreateReplCtx(TVector<TVDiskID>& vdisks, const TIntrus
nullptr,
pdiskCtx,
hugeBlobCtx,
+ 4097,
nullptr,
info,
TActorId(),
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
index e48f236ba3c..bdec548c93e 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp
@@ -156,6 +156,7 @@ namespace NKikimr {
};
std::shared_ptr<TReplCtx> ReplCtx;
+ ui32 NextMinREALHugeBlobInBytes;
THistory History;
EState State;
TInstant LastReplStart;
@@ -252,6 +253,10 @@ namespace NKikimr {
}
}
+ void Handle(TEvMinHugeBlobSizeUpdate::TPtr ev) {
+ NextMinREALHugeBlobInBytes = ev->Get()->MinREALHugeBlobInBytes;
+ }
+
void StartReplication() {
STLOG(PRI_DEBUG, BS_REPL, BSVR14, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL START"));
STLOG(PRI_DEBUG, BS_REPL, BSVR15, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "QUANTUM START"));
@@ -262,6 +267,8 @@ namespace NKikimr {
ReplCtx->MonGroup.ReplWorkUnitsDone() = 0;
ReplCtx->MonGroup.ReplItemsRemaining() = 0;
ReplCtx->MonGroup.ReplItemsDone() = 0;
+ Y_ABORT_UNLESS(NextMinREALHugeBlobInBytes);
+ ReplCtx->MinREALHugeBlobInBytes = NextMinREALHugeBlobInBytes;
UnrecoveredNonphantomBlobs = false;
Become(&TThis::StateRepl);
@@ -609,6 +616,7 @@ namespace NKikimr {
cFunc(TEvBlobStorage::EvCommenceRepl, StartReplication)
hFunc(TEvReplInvoke, Handle)
hFunc(TEvReplCheckProgress, ReplProgressWatchdog)
+ hFunc(TEvMinHugeBlobSizeUpdate, Handle)
)
void PassAway() override {
@@ -652,6 +660,7 @@ namespace NKikimr {
cFunc(TEvBlobStorage::EvCommenceRepl, Ignore)
hFunc(TEvReplInvoke, Handle)
hFunc(TEvReplCheckProgress, ReplProgressWatchdog)
+ hFunc(TEvMinHugeBlobSizeUpdate, Handle)
)
public:
@@ -662,6 +671,7 @@ namespace NKikimr {
TReplScheduler(std::shared_ptr<TReplCtx> &replCtx)
: TActorBootstrapped<TReplScheduler>()
, ReplCtx(replCtx)
+ , NextMinREALHugeBlobInBytes(ReplCtx->MinREALHugeBlobInBytes)
, History(HistorySize)
, State(Relaxation)
, ReplProgressWatchdog(
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h
index 8795969ae13..a8ac714d52c 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replctx.h
@@ -15,6 +15,7 @@ namespace NKikimr {
TIntrusivePtr<THullCtx> HullCtx;
TPDiskCtxPtr PDiskCtx;
std::shared_ptr<THugeBlobCtx> HugeBlobCtx;
+ ui32 MinREALHugeBlobInBytes;
TIntrusivePtr<THullDs> HullDs;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TActorId SkeletonId;
@@ -30,6 +31,7 @@ namespace NKikimr {
TIntrusivePtr<THullCtx> hullCtx,
TPDiskCtxPtr pdiskCtx,
std::shared_ptr<THugeBlobCtx> hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TIntrusivePtr<THullDs> hullDs,
TIntrusivePtr<TBlobStorageGroupInfo> info,
const TActorId &skeletonId,
@@ -40,6 +42,7 @@ namespace NKikimr {
, HullCtx(std::move(hullCtx))
, PDiskCtx(std::move(pdiskCtx))
, HugeBlobCtx(std::move(hugeBlobCtx))
+ , MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
, HullDs(std::move(hullDs))
, GInfo(std::move(info))
, SkeletonId(skeletonId)
@@ -47,7 +50,9 @@ namespace NKikimr {
, VDiskCfg(std::move(vdiskCfg))
, PDiskWriteBytes(std::move(pdiskWriteBytes))
, PausedAtStart(pausedAtStart)
- {}
+ {
+ Y_ABORT_UNLESS(MinREALHugeBlobInBytes);
+ }
bool GetAddHeader() const { return !HullCtx || HullCtx->AddHeader; }
};
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
index ea41a013e12..2c8e20602d8 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine.h
@@ -204,7 +204,7 @@ namespace NKikimr {
partsSize += partSize;
TRope& data = item.Parts[i];
Y_ABORT_UNLESS(data.GetSize() == partSize);
- if (ReplCtx->HugeBlobCtx->IsHugeBlob(groupType, id)) {
+ if (ReplCtx->HugeBlobCtx->IsHugeBlob(groupType, id, ReplCtx->MinREALHugeBlobInBytes)) {
AddBlobToQueue(partId, TDiskBlob::Create(id.BlobSize(), i + 1, groupType.TotalPartCount(),
std::move(data), Arena, ReplCtx->GetAddHeader()), {}, true, rbq);
++numHuge;
@@ -378,7 +378,7 @@ namespace NKikimr {
void RecoverMetadata(const TLogoBlobID& id, TRecoveredBlobsQueue& rbq) {
while (!MetadataParts.empty() && MetadataParts.front().FullID() <= id) {
const TLogoBlobID id = MetadataParts.front();
- const bool isHugeBlob = ReplCtx->HugeBlobCtx->IsHugeBlob(ReplCtx->VCtx->Top->GType, id.FullID());
+ const bool isHugeBlob = ReplCtx->HugeBlobCtx->IsHugeBlob(ReplCtx->VCtx->Top->GType, id.FullID(), ReplCtx->MinREALHugeBlobInBytes);
MetadataParts.pop_front();
STLOG(PRI_DEBUG, BS_REPL, BSVR30, VDISKP(ReplCtx->VCtx->VDiskLogPrefix,
"TRecoveryMachine::RecoverMetadata"), (BlobId, id));
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
index c5b434eb774..fcc5e1c923e 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_replrecoverymachine_ut.cpp
@@ -66,12 +66,13 @@ namespace NKikimr {
auto counters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
auto vctx = MakeIntrusive<TVDiskContext>(TActorId(), info->PickTopology(), counters, TVDiskID(0, 1, 0, 0, 0),
nullptr, NPDisk::DEVICE_TYPE_UNKNOWN);
- auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(512u << 10u, 4096u, nullptr, true);
+ auto hugeBlobCtx = std::make_shared<THugeBlobCtx>(nullptr, true);
auto replCtx = std::make_shared<TReplCtx>(
vctx,
nullptr, // HullCtx
nullptr, // PDiskCtx
hugeBlobCtx,
+ 4097,
nullptr,
info,
TActorId(),
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 2285d725101..632faf21a80 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -44,6 +44,8 @@
#include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h>
#include <ydb/core/blobstorage/vdisk/defrag/defrag_actor.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_internal_interface.h>
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/cms/console/console.h>
#include <ydb/core/protos/node_whiteboard.pb.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <library/cpp/monlib/service/pages/templates.h>
@@ -173,6 +175,41 @@ namespace NKikimr {
}
}
+ void Handle(const NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr &ev, const TActorContext &ctx) {
+ auto& record = ev->Get()->Record;
+ ctx.Send(ev->Sender, new NConsole::TEvConsole::TEvConfigNotificationResponse(record), 0, ev->Cookie);
+ if (!record.HasConfig() ) {
+ return;
+ }
+ if (const auto& config = record.GetConfig(); config.HasBlobStorageConfig() && config.GetBlobStorageConfig().HasVDiskPerformanceSettings()) {
+ for (auto &type : config.GetBlobStorageConfig().GetVDiskPerformanceSettings().GetVDiskTypes()) {
+ if (!type.HasPDiskType() || Config->BaseInfo.DeviceType != PDiskTypeToPDiskType(type.GetPDiskType())) {
+ continue;
+ }
+ if (!type.HasMinHugeBlobSizeInBytes()) {
+ continue;
+ }
+ if (!ApplyHugeBlobSize(type.GetMinHugeBlobSizeInBytes())) {
+ continue;
+ }
+ if (Config->RunRepl) {
+ ctx.Send(Db->ReplID, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes));
+ }
+ ctx.Send(*SkeletonFrontIDPtr, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes));
+ }
+ }
+ }
+
+ bool ApplyHugeBlobSize(ui32 minHugeBlobInBytes) {
+ ui32 alignedSize = HugeBlobCtx->HugeSlotsMap->AlignByBlockSize(minHugeBlobInBytes) + 1;
+ if (MinREALHugeBlobInBytes == alignedSize) {
+ return false;
+ }
+ MinREALHugeBlobInBytes = alignedSize;
+ IFaceMonGroup->MinHugeBlobInBytes(MinREALHugeBlobInBytes);
+ return true;
+ }
+
////////////////////////////////////////////////////////////////////////
// SEND REPLY
////////////////////////////////////////////////////////////////////////
@@ -511,7 +548,7 @@ namespace NKikimr {
TVPutInfo &info = putsInfo.back();
try {
- info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, blobId.FullID());
+ info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, blobId.FullID(), MinREALHugeBlobInBytes);
if (info.IsHugeBlob) {
LOG_CRIT_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << "TEvVMultiPut: TEvVMultiPut has huge blob# "
<< blobId << " Marker# BSVS08");
@@ -654,7 +691,7 @@ namespace NKikimr {
const ui64 bufSize = info.Buffer.GetSize();
try {
- info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID());
+ info.IsHugeBlob = HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID(), MinREALHugeBlobInBytes);
} catch (yexception ex) {
LOG_ERROR_S(ctx, BS_VDISK_PUT, VCtx->VDiskLogPrefix << ex.what() << " Marker# BSVS41");
info.HullStatus = {NKikimrProto::ERROR, 0, false};
@@ -1630,8 +1667,8 @@ namespace NKikimr {
TRope buf = std::move(msg->Data);
const ui64 bufSize = buf.GetSize();
- Y_ABORT_UNLESS(bufSize <= Config->MaxLogoBlobDataSize && HugeBlobCtx->IsHugeBlob(VCtx->Top->GType, id.FullID()),
- "TEvRecoveredHugeBlob: blob is too small/huge bufSize# %zu", bufSize);
+ Y_ABORT_UNLESS(bufSize <= Config->MaxLogoBlobDataSize,
+ "TEvRecoveredHugeBlob: blob is huge bufSize# %zu", bufSize);
UpdatePDiskWriteBytes(bufSize);
auto oosStatus = VCtx->GetOutOfSpaceState().GetGlobalStatusFlags();
@@ -1716,9 +1753,14 @@ namespace NKikimr {
auto msg = std::make_unique<TEvFrontRecoveryStatus>(TEvFrontRecoveryStatus::SyncGuidRecoveryDone,
NKikimrProto::OK,
(PDiskCtx ? PDiskCtx->Dsk : nullptr),
- HugeBlobCtx,
+ MinREALHugeBlobInBytes,
Db->GetVDiskIncarnationGuid());
ctx.Send(*SkeletonFrontIDPtr, msg.release());
+ ctx.Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest(
+ NKikimrConsole::TConfigItem::BlobStorageConfigItem,
+ SelfId()
+ ));
+
Hull->PermitGarbageCollection(ctx);
// propagate status to Node Warden unless replication is on -- in that case it sets the status itself
if (!runRepl) {
@@ -1738,7 +1780,7 @@ namespace NKikimr {
auto msg = std::make_unique<TEvFrontRecoveryStatus>(phase,
NKikimrProto::ERROR,
(PDiskCtx ? PDiskCtx->Dsk : nullptr),
- HugeBlobCtx,
+ MinREALHugeBlobInBytes,
Db->GetVDiskIncarnationGuid());
ctx.Send(*SkeletonFrontIDPtr, msg.release());
// push the status
@@ -1785,7 +1827,7 @@ namespace NKikimr {
// check status
if (ev->Get()->Status == NKikimrProto::OK) {
- IFaceMonGroup->MinHugeBlobInBytes(HugeBlobCtx->MinREALHugeBlobInBytes);
+ ApplyHugeBlobSize(Config->MinHugeBlobInBytes);
// handle special case when donor disk starts and finds out that it has been wiped out
if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) {
// send drop donor cmd to NodeWarden
@@ -1802,7 +1844,7 @@ namespace NKikimr {
auto msg = std::make_unique<TEvFrontRecoveryStatus>(TEvFrontRecoveryStatus::LocalRecoveryDone,
NKikimrProto::OK,
PDiskCtx->Dsk,
- HugeBlobCtx,
+ MinREALHugeBlobInBytes,
Db->GetVDiskIncarnationGuid());
ctx.Send(*SkeletonFrontIDPtr, msg.release());
@@ -1965,7 +2007,7 @@ namespace NKikimr {
DbBirthLsn = ev->Get()->DbBirthLsn;
SkeletonIsUpAndRunning(ctx, Config->RunRepl);
if (Config->RunRepl) {
- auto replCtx = std::make_shared<TReplCtx>(VCtx, HullCtx, PDiskCtx, HugeBlobCtx, Hull->GetHullDs(),
+ auto replCtx = std::make_shared<TReplCtx>(VCtx, HullCtx, PDiskCtx, HugeBlobCtx, MinREALHugeBlobInBytes, Hull->GetHullDs(),
GInfo, SelfId(), Config, PDiskWriteBytes, Config->ReplPausedAtStart);
Db->ReplID.Set(ctx.Register(CreateReplActor(replCtx)));
ActiveActors.Insert(Db->ReplID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); // keep forever
@@ -2353,6 +2395,13 @@ namespace NKikimr {
TActivationContext::Send(new IEventHandle(TEvBlobStorage::EvCommenceRepl, 0, Db->ReplID, SelfId(), nullptr, 0));
}
}
+
+ void PassAway() override {
+ Send(NConsole::MakeConfigsDispatcherID(SelfId().NodeId()), new NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest(
+ SelfId()
+ ));
+ TActor::PassAway();
+ }
void ForwardToScrubActor(STFUNC_SIG) {
Forward(ev, ScrubId);
@@ -2595,6 +2644,8 @@ namespace NKikimr {
)
STRICT_STFUNC(StateNormal,
+ IgnoreFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse);
+ HFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
HFunc(TEvBlobStorage::TEvVMovedPatch, Handle)
HFunc(TEvBlobStorage::TEvVPatchStart, Handle)
HFunc(TEvBlobStorage::TEvVPatchDiff, HandleVPatchDiffResending)
@@ -2741,6 +2792,7 @@ namespace NKikimr {
THullCtxPtr HullCtx;
THugeBlobCtxPtr HugeBlobCtx;
std::shared_ptr<THullLogCtx> HullLogCtx;
+ ui32 MinREALHugeBlobInBytes;
std::shared_ptr<THull> Hull; // run it after local recovery
std::shared_ptr<TOutOfSpaceLogic> OutOfSpaceLogic;
std::shared_ptr<TQueryCtx> QueryCtx;
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index 49e439a054a..c2a83310e96 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -41,12 +41,12 @@ namespace NKikimr {
TEvFrontRecoveryStatus::TEvFrontRecoveryStatus(EPhase phase,
NKikimrProto::EReplyStatus status,
const TIntrusivePtr<TPDiskParams> &dsk,
- std::shared_ptr<THugeBlobCtx> hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TVDiskIncarnationGuid vdiskIncarnationGuid)
: Phase(phase)
, Status(status)
, Dsk(dsk)
- , HugeBlobCtx(std::move(hugeBlobCtx))
+ , MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
, VDiskIncarnationGuid(vdiskIncarnationGuid)
{}
@@ -819,7 +819,7 @@ namespace NKikimr {
TBlobStorageGroupType type = (GInfo ? GInfo->Type : TErasureType::ErasureNone);
VCtx->UpdateCostModel(std::make_unique<TCostModel>(msg->Dsk->SeekTimeUs, msg->Dsk->ReadSpeedBps,
msg->Dsk->WriteSpeedBps, msg->Dsk->ReadBlockSize, msg->Dsk->WriteBlockSize,
- msg->HugeBlobCtx->MinREALHugeBlobInBytes, type));
+ msg->MinREALHugeBlobInBytes, type));
break;
}
case TEvFrontRecoveryStatus::SyncGuidRecoveryDone:
@@ -831,6 +831,12 @@ namespace NKikimr {
}
}
+ void Handle(TEvMinHugeBlobSizeUpdate::TPtr &ev) {
+ VCtx->UpdateCostModel(std::make_unique<TCostModel>(VCtx->CostModel->SeekTimeUs, VCtx->CostModel->ReadSpeedBps,
+ VCtx->CostModel->WriteSpeedBps, VCtx->CostModel->ReadBlockSize, VCtx->CostModel->WriteBlockSize,
+ ev->Get()->MinREALHugeBlobInBytes, VCtx->CostModel->GType));
+ }
+
static NKikimrWhiteboard::EFlag ToLightSignal(NKikimrWhiteboard::EVDiskState st) {
switch (st) {
case NKikimrWhiteboard::EVDiskState::Initial: return NKikimrWhiteboard::EFlag::Yellow;
@@ -2063,6 +2069,7 @@ namespace NKikimr {
HFunc(TEvReportScrubStatus, Handle)
HFunc(NNodeWhiteboard::TEvWhiteboard::TEvVDiskStateUpdate, Handle)
fFunc(TEvBlobStorage::EvForwardToSkeleton, HandleForwardToSkeleton)
+ hFunc(TEvMinHugeBlobSizeUpdate, Handle)
)
#define HFuncStatus(TEvType, status, errorReason, now, wstatus) \
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h
index b5ef914ef06..d5a7c13ad6d 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.h
@@ -10,7 +10,6 @@ namespace NKikimr {
struct TPDiskParams;
class TBlobStorageGroupInfo;
- class THugeBlobCtx;
////////////////////////////////////////////////////////////////////////////
// TEvFrontRecoveryStatus
@@ -25,13 +24,13 @@ namespace NKikimr {
const EPhase Phase;
const NKikimrProto::EReplyStatus Status;
const TIntrusivePtr<TPDiskParams> Dsk;
- const std::shared_ptr<THugeBlobCtx> HugeBlobCtx;
+ ui32 MinREALHugeBlobInBytes;
const TVDiskIncarnationGuid VDiskIncarnationGuid;
TEvFrontRecoveryStatus(EPhase phase,
NKikimrProto::EReplyStatus status,
const TIntrusivePtr<TPDiskParams> &dsk,
- std::shared_ptr<THugeBlobCtx> hugeBlobCtx,
+ ui32 MinREALHugeBlobInBytes,
TVDiskIncarnationGuid vdiskIncarnationGuid);
~TEvFrontRecoveryStatus();
};
diff --git a/ydb/core/cms/console/configs_dispatcher.cpp b/ydb/core/cms/console/configs_dispatcher.cpp
index 395f70fb88d..be2c90555e5 100644
--- a/ydb/core/cms/console/configs_dispatcher.cpp
+++ b/ydb/core/cms/console/configs_dispatcher.cpp
@@ -64,6 +64,7 @@ const THashSet<ui32> DYNAMIC_KINDS({
(ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem,
(ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem,
(ui32)NKikimrConsole::TConfigItem::TracingConfigItem,
+ (ui32)NKikimrConsole::TConfigItem::BlobStorageConfigItem,
});
const THashSet<ui32> NON_YAML_KINDS({