diff options
author | Alexander Rutkovsky <alexvru@ydb.tech> | 2024-11-12 12:12:33 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 09:12:33 +0000 |
commit | e62a6425cc6cbabb06b09d03cfc8f9b6ad1cb75c (patch) | |
tree | b46a9a151f482775c051980aa8224de24e47bc68 | |
parent | 5f2bf90bebedec6c74fc7f66002b35c585f8060e (diff) | |
download | ydb-e62a6425cc6cbabb06b09d03cfc8f9b6ad1cb75c.tar.gz |
Implement intermixed inplace and huge blobs and add test for this case (#10973)
60 files changed, 1471 insertions, 1189 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index e7dcbb53a5e..fb48e5185b6 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -738,6 +738,9 @@ struct TEvBlobStorage { EvHugePreCompactResult, EvPDiskMetadataLoaded, EvBalancingSendPartsOnMain, + EvHugeAllocateSlots, + EvHugeAllocateSlotsResult, + EvHugeDropAllocatedSlots, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, diff --git a/ydb/core/blobstorage/ut_blobstorage/huge.cpp b/ydb/core/blobstorage/ut_blobstorage/huge.cpp new file mode 100644 index 00000000000..6ec94afe95c --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/huge.cpp @@ -0,0 +1,240 @@ +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> +#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h> +#include <ydb/core/util/lz4_data_generator.h> +#include <ydb/core/erasure/erasure.h> + +namespace { + + class THugeBlobTest { + TEnvironmentSetup Env; + TTestActorSystem& Runtime; + ui32 DataSize = 32_KB; + TString Data = FastGenDataForLZ4(DataSize, 1 /*seed*/); + TLogoBlobID BlobId{1000, 1, 1, 0, DataSize, 0}; + ui32 GroupId; + TIntrusivePtr<TBlobStorageGroupInfo> Info; + TBlobStorageGroupType GType; + TBlobStorageGroupInfo::TVDiskIds VDiskIds; + TBlobStorageGroupInfo::TServiceIds ServiceIds; + std::vector<TActorId> PutQueueIds; + std::vector<TActorId> GetQueueIds; + std::vector<TRope> Parts; + ui8 TestSubgroupNodeId = 6; + + public: + THugeBlobTest() + : Env{{ + .Erasure = TBlobStorageGroupType::Erasure4Plus2Block, + .UseFakeConfigDispatcher = true, + }} + , Runtime(*Env.Runtime) + { + Env.CreateBoxAndPool(1, 1); + + std::vector<ui32> groups = Env.GetGroups(); + UNIT_ASSERT(!groups.empty()); + GroupId = groups.front(); + + Info = Env.GetGroupInfo(GroupId); + GType = Info->Type; + + Info->PickSubgroup(BlobId.Hash(), &VDiskIds, &ServiceIds); + + for (const TVDiskID& vdiskId : VDiskIds) { + PutQueueIds.push_back(Env.CreateQueueActor(vdiskId, NKikimrBlobStorage::PutTabletLog, 0)); + GetQueueIds.push_back(Env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0)); + } + + Parts.resize(GType.TotalPartCount()); + const bool success = ErasureSplit(TErasureType::CrcModeNone, GType, TRope(Data), Parts); + UNIT_ASSERT(success); + +// for (ui32 i = 0; i < 6; ++i) { // put main parts +// Put(i, i); +// } + } + + void Put(ui8 subgroupNodeId, ui8 partIdx) { + Cerr << "writing partIdx# " << (int)partIdx << " to " << (int)subgroupNodeId << Endl; + const TActorId queueActorId = PutQueueIds[subgroupNodeId]; + auto edge = Runtime.AllocateEdgeActor(queueActorId.NodeId(), __FILE__, __LINE__); + const TLogoBlobID putId(BlobId, partIdx + 1); + Runtime.Send(new IEventHandle(queueActorId, edge, new TEvBlobStorage::TEvVPut(putId, Parts[partIdx], + VDiskIds[subgroupNodeId], false, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog)), + queueActorId.NodeId()); + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVPutResult>(edge); + auto& record = res->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK); + } + + void SwitchHugeBlobSize(bool small) { + Cerr << "small blob# " << small << Endl; + + auto ev = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>(); + auto& record = ev->Record; + auto& config = *record.MutableConfig(); + auto& bsConfig = *config.MutableBlobStorageConfig(); + auto& perf = *bsConfig.MutableVDiskPerformanceSettings(); + auto& type = *perf.AddVDiskTypes(); + type.SetPDiskType(NKikimrBlobStorage::EPDiskType::ROT); + type.SetMinHugeBlobSizeInBytes(small ? 4096 : 524288); + + const ui32 serviceNodeId = ServiceIds[TestSubgroupNodeId].NodeId(); + TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__); + Runtime.Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(serviceNodeId), edge, ev.release()), + serviceNodeId); + Env.WaitForEdgeActorEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>(edge); + } + + void CompactFresh() { + Cerr << "compacting fresh" << Endl; + + const TActorId serviceId = ServiceIds[TestSubgroupNodeId]; + const ui32 serviceNodeId = serviceId.NodeId(); + TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__); + Runtime.Send(new IEventHandle(serviceId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs, + TEvCompactVDisk::EMode::FRESH_ONLY)), serviceNodeId); + Env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge); + } + + void CompactLevels() { + Cerr << "compacting levels" << Endl; + + const TActorId serviceId = ServiceIds[TestSubgroupNodeId]; + const ui32 serviceNodeId = serviceId.NodeId(); + TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__); + Runtime.Send(new IEventHandle(serviceId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs, + TEvCompactVDisk::EMode::FULL)), serviceNodeId); + Env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge); + } + + void PutPartsInMask(ui32 mask) { + for (ui32 i = 0; i < GType.TotalPartCount(); ++i) { + if (mask & (1 << i)) { + Put(TestSubgroupNodeId, i); + } + } + } + + void CheckPartsInPlace(ui32 mask, ui32 numDistinctSST) { + Cerr << "checking parts in place mask# " << mask << Endl; + + std::set<ui64> sstIds; + const TActorId serviceId = ServiceIds[TestSubgroupNodeId]; + auto captureRes = Env.SyncQuery<TEvBlobStorage::TEvCaptureVDiskLayoutResult, + TEvBlobStorage::TEvCaptureVDiskLayout>(serviceId); + for (const auto& item : captureRes->Layout) { + using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult; + if (item.Database == T::EDatabase::LogoBlobs) { + Cerr << item.ToString() << Endl; + if (item.RecordType == T::ERecordType::IndexRecord && item.SstId) { + sstIds.insert(item.SstId); + } + } + } + UNIT_ASSERT_VALUES_EQUAL(sstIds.size(), numDistinctSST); + + auto getQuery = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(VDiskIds[TestSubgroupNodeId], + TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, {}, {}, {{BlobId}}); + const TActorId queueId = GetQueueIds[TestSubgroupNodeId]; + const ui32 queueNodeId = queueId.NodeId(); + auto edge = Runtime.AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__); + Runtime.Send(new IEventHandle(queueId, edge, getQuery.release()), queueNodeId); + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(edge); + auto& record = res->Get()->Record; + + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK); + if (!mask) { + UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1); + auto& result = record.GetResult(0); + UNIT_ASSERT_VALUES_EQUAL(LogoBlobIDFromLogoBlobID(result.GetBlobID()), BlobId); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA); + } else { + for (const auto& result : record.GetResult()) { + const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID()); + UNIT_ASSERT_VALUES_EQUAL(id.FullID(), BlobId); + UNIT_ASSERT(id.PartId()); + const ui8 partIdx = id.PartId() - 1; + + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::OK); + UNIT_ASSERT_EQUAL(res->Get()->GetBlobData(result), Parts[partIdx]); + UNIT_ASSERT(mask & (1 << partIdx)); + + mask &= ~(1 << partIdx); + } + } + + UNIT_ASSERT_VALUES_EQUAL(mask, 0); + } + + void RunTest(ui32 fresh1, ui32 fresh2, ui32 huge1, ui32 huge2, bool targetHuge, ui32 fresh3, ui32 huge3, + bool targetHuge2, bool targetHuge3) { + if (fresh1 || fresh2) { + SwitchHugeBlobSize(false); + PutPartsInMask(fresh1); + PutPartsInMask(fresh2); + CheckPartsInPlace(fresh1 | fresh2, 0); + } + if (huge1 || huge2) { + SwitchHugeBlobSize(true); + PutPartsInMask(huge1); + PutPartsInMask(huge2); + CheckPartsInPlace(huge1 | huge2 | fresh1 | fresh2, 0); + } + const ui32 baseMask = fresh1 | fresh2 | huge1 | huge2; + if (baseMask) { + SwitchHugeBlobSize(targetHuge); + CompactFresh(); + CheckPartsInPlace(baseMask, 1); + } + if (const ui32 extraMask = fresh3 | huge3) { + if (fresh3) { + SwitchHugeBlobSize(false); + PutPartsInMask(fresh3); + } + if (huge3) { + SwitchHugeBlobSize(true); + PutPartsInMask(huge3); + } + SwitchHugeBlobSize(targetHuge2); + CompactFresh(); + CheckPartsInPlace(baseMask | extraMask, !!baseMask + !!extraMask); + + SwitchHugeBlobSize(targetHuge3); + CompactLevels(); + CheckPartsInPlace(baseMask | extraMask, 1); + } + } + + static void CompactionTest() { + for (ui32 fresh1 = 0; fresh1 < 8; ++fresh1) { + for (ui32 fresh2 = 0; fresh2 < 2; ++fresh2) { + for (ui32 huge1 = 0; huge1 < 4; ++huge1) { + for (ui32 huge2 = 0; huge2 < 2; ++huge2) { + for (bool targetHuge : {true, false}) { + for (ui32 fresh3 = 0; fresh3 < 4; ++fresh3) { + for (ui32 huge3 = 0; huge3 < 2; ++huge3) { + for (bool targetHuge2 : {true, false}) { + for (bool targetHuge3 : {true, false}) { + Cerr << "fresh1# " << fresh1 << " fresh2# " << fresh2 << " huge1# " << huge1 + << " huge2# " << huge2 << " targetHuge# " << targetHuge + << " fresh3# " << fresh3 + << " huge3# " << huge3 + << " targetHuge2# " << targetHuge2 + << " targetHuge3# " << targetHuge3 + << Endl; + THugeBlobTest test; + test.RunTest(fresh1, fresh2, huge1, huge2, targetHuge, fresh3, huge3, targetHuge2, targetHuge3); + }}}}}}}}} + } + }; + +} + +Y_UNIT_TEST_SUITE(HugeBlobOnlineSizeChange) { + + Y_UNIT_TEST(Compaction) { + THugeBlobTest::CompactionTest(); + } + +} diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h index 08f759acd99..9937d560d1a 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h @@ -28,6 +28,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/system/rusage.h> #include <util/random/fast.h> +#include <util/stream/null.h> using namespace NActors; using namespace NKikimr; diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index 104fa4986b9..69b783e74db 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -9,6 +9,8 @@ #include <library/cpp/testing/unittest/registar.h> +static auto& Cconf = Cnull; + struct TEnvironmentSetup { std::unique_ptr<TTestActorSystem> Runtime; static constexpr ui32 DrivesPerNode = 5; @@ -82,7 +84,23 @@ struct TEnvironmentSetup { class TFakeConfigDispatcher : public TActor<TFakeConfigDispatcher> { std::unordered_set<TActorId> Subscribers; - TActorId EdgeId; + + struct TConfigDistributionTask { + TActorId Sender; + ui64 Cookie; + std::unique_ptr<IEventBase> Response; + THashSet<TActorId> RepliesPending; + + TConfigDistributionTask(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) + : Sender(ev->Sender) + , Cookie(ev->Cookie) + , Response(std::make_unique<NConsole::TEvConsole::TEvConfigNotificationResponse>(ev->Get()->Record)) + {} + }; + + std::map<std::tuple<TActorId, ui64>, std::shared_ptr<TConfigDistributionTask>> TasksPending; + ui64 LastCookie = 0; + public: TFakeConfigDispatcher() : TActor<TFakeConfigDispatcher>(&TFakeConfigDispatcher::StateWork) @@ -91,10 +109,11 @@ struct TEnvironmentSetup { STFUNC(StateWork) { switch (ev->GetTypeRewrite()) { - hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle); + hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle) hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle) hFunc(NConsole::TEvConsole::TEvConfigNotificationResponse, Handle) hFunc(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle) + hFunc(TEvents::TEvUndelivered, Handle) } } @@ -103,24 +122,61 @@ struct TEnvironmentSetup { if (items.at(0) != NKikimrConsole::TConfigItem::BlobStorageConfigItem) { return; } + Cconf << "@ subscribed " << ev->Sender << Endl; Subscribers.emplace(ev->Sender); } void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { - EdgeId = ev->Sender; + auto task = std::make_shared<TConfigDistributionTask>(ev); + const ui64 cookie = ++LastCookie; + for (auto& id : Subscribers) { auto update = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); update->Record.CopyFrom(ev->Get()->Record); - Send(id, update.Release()); + Send(id, update.Release(), IEventHandle::FlagTrackDelivery, cookie); + task->RepliesPending.insert(id); + const auto [it, inserted] = TasksPending.emplace(std::make_tuple(id, cookie), task); + Y_ABORT_UNLESS(inserted); + } + + Cconf << "@ created task " << cookie << " for " << Subscribers.size() << " subscribers from " << + ev->Sender << " cookie " << ev->Cookie << Endl; + } + + void Handle(TEvents::TEvUndelivered::TPtr& ev) { + Cconf << "@ undelivered " << ev->Sender << " cookie " << ev->Cookie << Endl; + const auto nh = TasksPending.extract(std::make_tuple(ev->Sender, ev->Cookie)); + Y_ABORT_UNLESS(nh); + Finish(*nh.mapped(), ev->Sender); + } + + void Finish(TConfigDistributionTask& task, TActorId sender) { + Cconf << "@ finish from " << sender << " task " << task.Sender << " cookie " << task.Cookie << Endl; + const size_t numErased = task.RepliesPending.erase(sender); + Y_ABORT_UNLESS(numErased); + if (task.RepliesPending.empty()) { + Send(task.Sender, task.Response.release(), 0, task.Cookie); } } void Handle(NConsole::TEvConsole::TEvConfigNotificationResponse::TPtr& ev) { - Forward(ev, EdgeId); + Cconf << "@ TEvConfigNotificationResponse from " << ev->Sender << " cookie " << ev->Cookie << Endl; + const auto nh = TasksPending.extract(std::make_tuple(ev->Sender, ev->Cookie)); + Y_ABORT_UNLESS(nh); + Finish(*nh.mapped(), ev->Sender); } void Handle(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr& ev) { - Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release()); + Cconf << "@ TEvRemoveConfigSubscriptionRequest from " << ev->Sender << Endl; + + for (auto it = TasksPending.lower_bound(std::make_tuple(ev->Sender, 0)); it != TasksPending.end() && + std::get<0>(it->first) == ev->Sender; it = TasksPending.erase(it)) { + Finish(*it->second, ev->Sender); + } + + const size_t numErased = Subscribers.erase(ev->Sender); + Y_ABORT_UNLESS(numErased); + Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release()); } }; diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make new file mode 100644 index 00000000000..4df1d9171d6 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make @@ -0,0 +1,21 @@ +UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage) + +FORK_SUBTESTS() + +SIZE(MEDIUM) + +TIMEOUT(600) + +SRCS( + huge.cpp +) + +PEERDIR( + ydb/core/blobstorage/ut_blobstorage/lib +) + +IF (SANITIZER_TYPE) + REQUIREMENTS(ram:32) +ENDIF() + +END() diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make index 23a7b2ff36f..ded5f16757b 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ya.make @@ -64,6 +64,7 @@ RECURSE_FOR_TESTS( ut_blob_depot_fat ut_donor ut_group_reconfiguration + ut_huge ut_read_only_vdisk ut_osiris ut_replication diff --git a/ydb/core/blobstorage/ut_vdisk2/huge.cpp b/ydb/core/blobstorage/ut_vdisk2/huge.cpp index 153a2a8e366..a0153f8dd6f 100644 --- a/ydb/core/blobstorage/ut_vdisk2/huge.cpp +++ b/ydb/core/blobstorage/ut_vdisk2/huge.cpp @@ -32,7 +32,9 @@ Y_UNIT_TEST_SUITE(VDiskTest) { UNIT_ASSERT_VALUES_EQUAL(res.ResultSize(), 1); const auto& value = res.GetResult(0); UNIT_ASSERT_VALUES_EQUAL(value.GetStatus(), NKikimrProto::OK); - UNIT_ASSERT_VALUES_EQUAL(value.GetBufferData(), *datap); + UNIT_ASSERT_EQUAL_C(value.GetBufferData(), *datap, "id# " << id + << " got# " << (int)value.GetBufferData().front() + << " expected# " << (int)datap->front()); } }; @@ -61,17 +63,19 @@ Y_UNIT_TEST_SUITE(VDiskTest) { const ui64 tabletId = tabletIds[RandomNumber(tabletIds.size())]; TTabletContext& tablet = tablets[tabletId]; - TString& data = blobValues[RandomNumber(blobValues.size())]; + size_t blobValueIndex = RandomNumber(blobValues.size()); + TString& data = blobValues[blobValueIndex]; TLogoBlobID id(tabletId, tablet.Gen, tablet.Step++, channel, data.size(), 0, 1); auto res = env->Put(id, data); UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), NKikimrProto::OK); - Cerr << "Put id# " << id << " totalSize# " << totalSize << Endl; + Cerr << "Put id# " << id << " totalSize# " << totalSize << " blobValueIndex# " << blobValueIndex << Endl; - content.emplace(id, &data); + const auto [it, inserted] = content.emplace(id, &data); + UNIT_ASSERT(inserted); totalSize += data.size(); - if (RandomNumber(1000u) < 3) { + if (RandomNumber(1000u) < 100) { ui32 minHugeBlobValue; do { minHugeBlobValue = minHugeBlobValues[RandomNumber(minHugeBlobValues.size())]; diff --git a/ydb/core/blobstorage/vdisk/balance/handoff_map.h b/ydb/core/blobstorage/vdisk/balance/handoff_map.h index ddb8f2b5103..3dd5cb47862 100644 --- a/ydb/core/blobstorage/vdisk/balance/handoff_map.h +++ b/ydb/core/blobstorage/vdisk/balance/handoff_map.h @@ -37,23 +37,6 @@ namespace NKikimr { } }; - //////////////// Transformed Item ////////////////////////////////////// - struct TTransformedItem { - TKey Key; - const TMemRec *MemRec; - const TDataMerger *DataMerger; - - // intermediate data - TMemRec NewMemRec; // new mem rec is build here if required - TDataMerger NewDataMerger; // new DataMerger, if we need to rebuild it - - TTransformedItem(); - const TTransformedItem *SetRaw(const TKey &key, const TMemRec *memRec, const TDataMerger *dataMerger); - const TTransformedItem *SetNewDisk(const TKey &key, const TIngress &ingress, TDataMerger &dataMerger); - const TTransformedItem *SetRmData(const TKey &key, const TMemRec *memRec, const TDataMerger *dataMerger); - }; - - THandoffMap(const THullCtxPtr &hullCtx, bool runHandoff, const TActorId &skeletonId) @@ -61,10 +44,6 @@ namespace NKikimr { , Top(HullCtx->VCtx->Top) , RunHandoff(runHandoff) , SkeletonId(skeletonId) - , DelMap() - , Counter(0) - , TrRes() - , Stat() { } @@ -80,15 +59,10 @@ namespace NKikimr { // Transforms record according to the built handoff map. It returns item we need to write, pointer is // valid until next call. Nullptr indicates that item is to be removed completely. - const TTransformedItem *Transform(const TKey& key, const TMemRec* memRec, - const TDataMerger* dataMerger, bool /*keepData*/, bool keepItem) { + void Transform(const TKey& /*key*/, TMemRec& /*memRec*/, TDataMerger& dataMerger, bool /*keepData*/) { // do nothing by default, all work is done in template specialization for logo blobs Counter++; - if (!keepItem) { - return nullptr; - } - Y_DEBUG_ABORT_UNLESS(dataMerger->Empty()); - return TrRes.SetRaw(key, memRec, dataMerger); + Y_DEBUG_ABORT_UNLESS(dataMerger.Empty()); } private: @@ -97,166 +71,61 @@ namespace NKikimr { const bool RunHandoff; const TActorId SkeletonId; - TDeque<ui8> DelMap; - unsigned Counter; - TTransformedItem TrRes; + std::vector<ui8> DelMap; + size_t Counter = 0; TStat Stat; }; - - //////////////////////////////////////////////////////////////////////////// - // TTransformedItem implementation - //////////////////////////////////////////////////////////////////////////// - template <class TKey, class TMemRec> - THandoffMap<TKey, TMemRec>::TTransformedItem::TTransformedItem() - : Key() - , MemRec(nullptr) - , DataMerger(nullptr) - , NewMemRec() - , NewDataMerger() - {} - - template <class TKey, class TMemRec> - const typename THandoffMap<TKey, TMemRec>::TTransformedItem * - THandoffMap<TKey, TMemRec>::TTransformedItem::SetRaw(const TKey &key, const TMemRec *memRec, - const TDataMerger *dataMerger) { - Key = key; - MemRec = memRec; - DataMerger = dataMerger; - NewMemRec.SetNoBlob(); - NewDataMerger.Clear(); - return this; - } - - template <class TKey, class TMemRec> - const typename THandoffMap<TKey, TMemRec>::TTransformedItem * - THandoffMap<TKey, TMemRec>::TTransformedItem::SetNewDisk(const TKey &key, const TIngress &ingress, - TDataMerger &dataMerger) { - NewMemRec = TMemRecLogoBlob(ingress); - NewDataMerger.Swap(dataMerger); - NewMemRec.SetType(NewDataMerger.GetType()); - - Key = key; - MemRec = &NewMemRec; - DataMerger = &NewDataMerger; - return this; - } - - template <class TKey, class TMemRec> - const typename THandoffMap<TKey, TMemRec>::TTransformedItem * - THandoffMap<TKey, TMemRec>::TTransformedItem::SetRmData(const TKey &key, const TMemRec *memRec, - const TDataMerger *dataMerger) { - NewMemRec = TMemRecLogoBlob(memRec->GetIngress()); - NewDataMerger.SetEmptyFromAnotherMerger(dataMerger); - NewMemRec.SetType(NewDataMerger.GetType()); - - Key = key; - MemRec = &NewMemRec; - DataMerger = &NewDataMerger; - return this; - } - - - //////////////////////////////////////////////////////////////////////////// - // Template specialization for LogoBlobs - //////////////////////////////////////////////////////////////////////////// - template <> - inline const THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::TTransformedItem * - THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::Transform( - const TKeyLogoBlob& key, - const TMemRecLogoBlob* memRec, - const TDataMerger* dataMerger, - bool keepData, - bool keepItem) - { + template<> + inline void THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::Transform(const TKeyLogoBlob& key, TMemRecLogoBlob& memRec, + TDataMerger& dataMerger, bool keepData) { Y_DEFER { Counter++; }; - if (!keepItem) { - return nullptr; - } - if (!keepData) { - return TrRes.SetRmData(key, memRec, dataMerger); + memRec.SetNoBlob(); + memRec.ClearLocalParts(Top->GType); + dataMerger.MakeEmpty(); // collect all huge blobs in the merger + return; } - const TTransformedItem *defaultResult = TrRes.SetRaw(key, memRec, dataMerger); // unchanged by default if (!RunHandoff) { - return defaultResult; + return; } Y_VERIFY(Counter < DelMap.size()); - TIngress ingress = memRec->GetIngress(); // ingress we are going to change - NMatrix::TVectorType localParts = ingress.LocalParts(Top->GType); + TIngress ingress = memRec.GetIngress(); // ingress we are going to change ui8 vecSize = Top->GType.TotalPartCount(); - const NMatrix::TVectorType delPlan(DelMap.at(Counter), vecSize); - const NMatrix::TVectorType delVec = delPlan & localParts; + const NMatrix::TVectorType delPlan(DelMap[Counter], vecSize); + const NMatrix::TVectorType delVec = delPlan & ingress.LocalParts(Top->GType); if (delVec.Empty()) { - return defaultResult; + return; } // mark deleted handoff parts in ingress - for (ui8 i = localParts.FirstPosition(); i != localParts.GetSize(); i = localParts.NextPosition(i)) { - if (delVec.Get(i)) { - const ui8 partId = i + 1; - TLogoBlobID id(key.LogoBlobID(), partId); - ingress.DeleteHandoff(Top.get(), HullCtx->VCtx->ShortSelfVDisk, id, true); - localParts.Clear(i); - } + for (ui8 i : delVec) { + // this clears local bit too + ingress.DeleteHandoff(Top.get(), HullCtx->VCtx->ShortSelfVDisk, TLogoBlobID(key.LogoBlobID(), i + 1), true); } - if (localParts.Empty()) { - // we have deleted all parts, we can remove the record completely - return nullptr; - } + // update merger with the filtered parts (only remaining local parts are kept) + TBlobType::EType type; + ui32 inplacedDataSize = 0; + dataMerger.FilterLocalParts(ingress.LocalParts(Top->GType), key.LogoBlobID(), &type, &inplacedDataSize); - TDataMerger newMerger; - switch (memRec->GetType()) { - case TBlobType::DiskBlob: { - const TDiskBlob& blob = dataMerger->GetDiskBlobMerger().GetDiskBlob(); - - // create new blob from kept parts - for (auto it = blob.begin(); it != blob.end(); ++it) { - if (localParts.Get(it.GetPartId() - 1)) { - newMerger.AddPart(blob, it); - } - } - break; - } - case TBlobType::HugeBlob: - case TBlobType::ManyHugeBlobs: { - const auto& oldMerger = dataMerger->GetHugeBlobMerger(); - - auto parts = oldMerger.GetParts(); - Y_DEBUG_ABORT_UNLESS(oldMerger.SavedData().size() == parts.CountBits()); - Y_DEBUG_ABORT_UNLESS(oldMerger.SavedData().size() == oldMerger.GetCircaLsns().size()); - - for (ui8 i = parts.FirstPosition(), j = 0; i != parts.GetSize(); i = parts.NextPosition(i), ++j) { - if (localParts.Get(i)) { - auto curOneHotPart = NMatrix::TVectorType::MakeOneHot(i, parts.GetSize()); - newMerger.AddHugeBlob(&oldMerger.SavedData()[j], &oldMerger.SavedData()[j] + 1, curOneHotPart, oldMerger.GetCircaLsns()[j]); - } else { - newMerger.AddDeletedHugeBlob(oldMerger.SavedData()[j]); - } - } - break; - } - default: { - Y_DEBUG_ABORT_UNLESS(false); - return defaultResult; - } + // reinstate memRec + memRec = TMemRecLogoBlob(ingress); + memRec.SetType(type); + if (type == TBlobType::DiskBlob && inplacedDataSize) { // update inplaced data size for inplace blob + memRec.SetDiskBlob(TDiskPart(0, 0, inplacedDataSize)); } - // SetNewDisk can handle empty dataMerger correctly. - // If dataMerger is empty, we still keep the record, it contains knowledge about - // this logoblob, only garbage collection removes records completely - return TrRes.SetNewDisk(key, ingress, newMerger); + Y_ABORT_UNLESS(memRec.GetLocalParts(Top->GType) == dataMerger.GetParts()); } - - template <> - template <class TIterator> + template<> + template<class TIterator> inline void THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::BuildMap( const TLevelIndexSnapshot& levelSnap, const TIterator& i) diff --git a/ydb/core/blobstorage/vdisk/common/disk_part.h b/ydb/core/blobstorage/vdisk/common/disk_part.h index 25faf131bcf..f06c23c7ec1 100644 --- a/ydb/core/blobstorage/vdisk/common/disk_part.h +++ b/ydb/core/blobstorage/vdisk/common/disk_part.h @@ -258,3 +258,7 @@ struct THash<NKikimr::TDiskPart> { } }; +template<> +inline void Out<NKikimr::TDiskPart>(IOutputStream& s, const NKikimr::TDiskPart& x) { + s << x.ToString(); +} diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp index d9fa9ee0144..540edfaac51 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp @@ -39,6 +39,7 @@ namespace NKikimr { HullCompLevelRateThreshold = 1.0; HullCompFreeSpaceThreshold = 2.0; FreshCompMaxInFlightWrites = 10; + FreshCompMaxInFlightReads = 10; // when moving huge blobs HullCompMaxInFlightWrites = 10; HullCompMaxInFlightReads = 20; HullCompReadBatchEfficiencyThreshold = 0.5; // don't issue reads if there are more gaps than the useful data diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h index 3632857e29d..dd80ff86cef 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h @@ -128,6 +128,7 @@ namespace NKikimr { double HullCompLevelRateThreshold; double HullCompFreeSpaceThreshold; ui32 FreshCompMaxInFlightWrites; + ui32 FreshCompMaxInFlightReads; ui32 HullCompMaxInFlightWrites; ui32 HullCompMaxInFlightReads; double HullCompReadBatchEfficiencyThreshold; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 9a6962b9db3..b99db787e0b 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -3073,6 +3073,15 @@ namespace NKikimr { TLogoBlobID BlobId; // for HugeBlob/InplaceBlob ui64 SstId; // for IndexRecord ui32 Level; // for IndexRecord + + TString ToString() const { + return TStringBuilder() << "{Location# " << Location.ToString() + << " Database# " << (int)Database + << " RecordType# " << (int)RecordType + << " BlobId# " << BlobId.ToString() + << " SstId# " << SstId + << " Level# " << Level << '}'; + } }; std::vector<TLayoutRecord> Layout; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h index 1c08c0be927..5a26e1e16ff 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h @@ -64,7 +64,6 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// class THugeBlobCtx { public: - // this value is multiply of AppendBlockSize and is calculated from Config->MinHugeBlobSize const std::shared_ptr<const THugeSlotsMap> HugeSlotsMap; const bool AddHeader; diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h index baf4290bd9c..be569ee9868 100644 --- a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h +++ b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h @@ -78,6 +78,7 @@ namespace NKikimr { , Barriers(FullSnap.BarriersSnap.CreateEssence(FullSnap.HullCtx)) , AllowKeepFlags(FullSnap.HullCtx->AllowKeepFlags) , Iter(FullSnap.HullCtx, &FullSnap.LogoBlobsSnap) + , Merger(GType, false /* addHeader doesn't really matter here */) { Iter.SeekToFirst(); } @@ -115,26 +116,22 @@ namespace NKikimr { } void Finish() { + TBlobType::EType type; + ui32 inplacedDataSize; + Merger.Finish(true, Key.LogoBlobID(), &type, &inplacedDataSize); if (!Merger.Empty()) { - Y_ABORT_UNLESS(!Merger.HasSmallBlobs()); NGc::TKeepStatus status = Barriers->Keep(Key, MemRec, {}, AllowKeepFlags, true /*allowGarbageCollection*/); - const auto& hugeMerger = Merger.GetHugeBlobMerger(); - const auto& local = MemRec.GetIngress().LocalParts(GType); - ui8 partIdx = local.FirstPosition(); - for (const TDiskPart& part : hugeMerger.SavedData()) { - Y_ABORT_UNLESS(partIdx != local.GetSize()); - if (part.ChunkIdx) { + for (const TDiskPart& part : Merger.GetSavedHugeBlobs()) { + if (!part.Empty()) { static_cast<TDerived&>(*this).Add(part, Key.LogoBlobID(), status.KeepData); } - partIdx = local.NextPosition(partIdx); } - for (const TDiskPart& part : hugeMerger.DeletedData()) { - if (part.ChunkIdx) { - static_cast<TDerived&>(*this).Add(part, Key.LogoBlobID(), false); - } + for (const TDiskPart& part : Merger.GetDeletedHugeBlobs()) { + Y_ABORT_UNLESS(!part.Empty()); + static_cast<TDerived&>(*this).Add(part, Key.LogoBlobID(), false); } - Merger.Clear(); } + Merger.Clear(); } void Update(const TMemRecLogoBlob &memRec, const TDiskPart *outbound, ui64 lsn) { diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp index 4337654c227..8e406994529 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp @@ -285,7 +285,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); //////////////////////////////////////////////////////////////////////////// class THullHugeBlobChunkAllocator : public TActorBootstrapped<THullHugeBlobChunkAllocator> { std::shared_ptr<THugeKeeperCtx> HugeKeeperCtx; - const TActorId NotifyID; + TActorId ParentId; ui64 Lsn; std::shared_ptr<THullHugeKeeperPersState> Pers; ui32 ChunkId = 0; @@ -294,7 +294,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); friend class TActorBootstrapped<THullHugeBlobChunkAllocator>; - void Bootstrap(const TActorContext &ctx) { + void Bootstrap(TActorId parentId, const TActorContext &ctx) { + ParentId = parentId; // reserve chunk LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "ChunkAllocator: bootstrap")); @@ -348,7 +349,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "ChunkAllocator: committed:" " chunkId# %" PRIu32 " LsnSeg# %" PRIu64, ChunkId, Lsn)); - ctx.Send(NotifyID, new TEvHullHugeChunkAllocated(ChunkId, SlotSize)); + ctx.Send(ParentId, new TEvHullHugeChunkAllocated(ChunkId, SlotSize)); Die(ctx); Span.EndOk(); } @@ -371,10 +372,9 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); return NKikimrServices::TActivity::BS_HULL_HUGE_BLOB_CHUNKALLOC; } - THullHugeBlobChunkAllocator(std::shared_ptr<THugeKeeperCtx> hugeKeeperCtx, const TActorId ¬ifyID, + THullHugeBlobChunkAllocator(std::shared_ptr<THugeKeeperCtx> hugeKeeperCtx, std::shared_ptr<THullHugeKeeperPersState> pers, NWilson::TTraceId traceId, ui32 slotSize) : HugeKeeperCtx(std::move(hugeKeeperCtx)) - , NotifyID(notifyID) , Pers(std::move(pers)) , Span(TWilson::VDiskTopLevel, std::move(traceId), "VDisk.HullHugeBlobChunkAllocator") , SlotSize(slotSize) @@ -705,8 +705,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); return true; } else if (AllocatingChunkPerSlotSize.insert(slotSize).second) { LWTRACK(HugeBlobChunkAllocatorStart, ev->Get()->Orbit); - auto aid = ctx.RegisterWithSameMailbox(new THullHugeBlobChunkAllocator(HugeKeeperCtx, ctx.SelfID, - State.Pers, std::move(traceId), slotSize)); + auto aid = ctx.RegisterWithSameMailbox(new THullHugeBlobChunkAllocator(HugeKeeperCtx, State.Pers, + std::move(traceId), slotSize)); ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); } if (putToQueue) { @@ -871,14 +871,14 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); const size_t numErased = AllocatingChunkPerSlotSize.erase(msg->SlotSize); Y_ABORT_UNLESS(numErased == 1); ActiveActors.Erase(ev->Sender); + ProcessAllocateSlotTasks(msg->SlotSize, ctx); ProcessQueue(msg->SlotSize, ctx); } void Handle(TEvHullFreeHugeSlots::TPtr &ev, const TActorContext &ctx) { TEvHullFreeHugeSlots *msg = ev->Get(); - Y_ABORT_UNLESS(!msg->HugeBlobs.Empty()); - if (CheckPendingWrite(msg->WId, ev, msg->DeletionLsn, "FreeHugeSlots")) { + if (msg->WId && CheckPendingWrite(msg->WId, ev, msg->DeletionLsn, "FreeHugeSlots")) { return; } @@ -887,6 +887,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); "THullHugeKeeper: TEvHullFreeHugeSlots: %s", msg->ToString().data())); THashSet<ui32> slotSizes; + for (const auto &x : msg->HugeBlobs) { slotSizes.insert(State.Pers->Heap->SlotSizeOfThisSize(x.Size)); NHuge::TFreeRes freeRes = State.Pers->Heap->Free(x); @@ -898,6 +899,11 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); ++State.ItemsAfterCommit; } + for (const TDiskPart& x : msg->AllocatedBlobs) { + const bool deleted = State.Pers->DeleteSlotInFlight(State.Pers->Heap->ConvertDiskPartToHugeSlot(x)); + Y_ABORT_UNLESS(deleted); + } + auto checkAndSet = [this, msg] (ui64 &dbLsn) { ui64 origRecoveredLsn = HugeKeeperCtx->LsnMngr->GetOriginallyRecoveredLsn(); Y_VERIFY_S(dbLsn <= msg->DeletionLsn, HugeKeeperCtx->VCtx->VDiskLogPrefix << " Check failed:" @@ -1058,6 +1064,103 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); HugeKeeperCtx->VCtx->GetHugeHeapFragmentation().Set(stat.CurrentlyUsedChunks, stat.CanBeFreedChunks); } + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TAllocateSlotsTask { + TActorId Sender; + ui64 Cookie; + std::vector<ui32> BlobSizes; + std::vector<TDiskPart> Result; + THashMap<ui32, TDynBitMap> Pending; + + TAllocateSlotsTask(TEvHugeAllocateSlots::TPtr& ev) + : Sender(ev->Sender) + , Cookie(ev->Cookie) + , BlobSizes(std::move(ev->Get()->BlobSizes)) + , Result(BlobSizes.size()) + {} + }; + + std::set<std::tuple<ui32, std::shared_ptr<TAllocateSlotsTask>>> SlotSizeToTask; + + void TryToFulfillTask(const std::shared_ptr<TAllocateSlotsTask>& task, ui32 slotSizeToProcess, const TActorContext& ctx) { + bool done = true; + + auto processItem = [&](size_t index, TDynBitMap *pending) { + auto& result = task->Result[index]; + Y_DEBUG_ABORT_UNLESS(result.Empty()); + + NHuge::THugeSlot hugeSlot; + ui32 slotSize; + if (State.Pers->Heap->Allocate(task->BlobSizes[index], &hugeSlot, &slotSize)) { + State.Pers->AddSlotInFlight(hugeSlot); + State.Pers->AddChunkSize(hugeSlot); + result = hugeSlot.GetDiskPart(); + if (pending) { + Y_DEBUG_ABORT_UNLESS(pending->Get(index)); + pending->Reset(index); + } + } else { + if (AllocatingChunkPerSlotSize.insert(slotSize).second) { + auto aid = ctx.RegisterWithSameMailbox(new THullHugeBlobChunkAllocator(HugeKeeperCtx, + State.Pers, {}, slotSize)); + ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); + } + done = false; + SlotSizeToTask.emplace(slotSize, task); + if (pending) { + Y_DEBUG_ABORT_UNLESS(pending->Get(index)); + return false; + } + task->Pending[slotSize].Set(index); + } + return true; + }; + + if (slotSizeToProcess) { + const auto it = task->Pending.find(slotSizeToProcess); + Y_ABORT_UNLESS(it != task->Pending.end()); + auto& pending = it->second; + Y_FOR_EACH_BIT(index, pending) { + if (!processItem(index, &pending)) { + break; + } + } + } else { + for (size_t i = 0; i < task->BlobSizes.size(); ++i) { + processItem(i, nullptr); + } + } + + if (done) { + LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLHUGE, HugeKeeperCtx->VCtx->VDiskLogPrefix + << "TEvHugeAllocateSlotsResult# " << FormatList(task->Result)); + Send(task->Sender, new TEvHugeAllocateSlotsResult(std::move(task->Result)), 0, task->Cookie); + } + } + + void ProcessAllocateSlotTasks(ui32 slotSize, const TActorContext& ctx) { + auto it = SlotSizeToTask.lower_bound(std::make_tuple(slotSize, nullptr)); + while (it != SlotSizeToTask.end() && std::get<0>(*it) == slotSize) { + auto node = SlotSizeToTask.extract(it++); + TryToFulfillTask(std::get<1>(node.value()), slotSize, ctx); + } + } + + void Handle(TEvHugeAllocateSlots::TPtr ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLHUGE, HugeKeeperCtx->VCtx->VDiskLogPrefix + << "TEvHugeAllocateSlots# " << FormatList(ev->Get()->BlobSizes)); + TryToFulfillTask(std::make_shared<TAllocateSlotsTask>(ev), 0, ctx); + } + + void Handle(TEvHugeDropAllocatedSlots::TPtr ev, const TActorContext& /*ctx*/) { + for (const auto& p : ev->Get()->Locations) { + State.Pers->Heap->Free(p); + const bool deleted = State.Pers->DeleteSlotInFlight(State.Pers->Heap->ConvertDiskPartToHugeSlot(p)); + Y_ABORT_UNLESS(deleted); + } + } + //////////// Event Handlers //////////////////////////////////// STFUNC(StateFunc) { @@ -1070,6 +1173,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); HFunc(TEvHullHugeCommitted, Handle) HFunc(TEvHullHugeWritten, Handle) HFunc(TEvHullHugeBlobLogged, Handle) + HFunc(TEvHugeAllocateSlots, Handle) + HFunc(TEvHugeDropAllocatedSlots, Handle) HFunc(TEvHugePreCompact, Handle) HFunc(TEvHugeLockChunks, Handle) HFunc(TEvHugeUnlockChunks, Handle) diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h index e3a09814f16..aaa1ede6d3e 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h @@ -131,12 +131,15 @@ namespace NKikimr { class TEvHullFreeHugeSlots : public TEventLocal<TEvHullFreeHugeSlots, TEvBlobStorage::EvHullFreeHugeSlots> { public: const TDiskPartVec HugeBlobs; + const TDiskPartVec AllocatedBlobs; const ui64 DeletionLsn; const TLogSignature Signature; // identifies database we send update for const ui64 WId; - TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn, TLogSignature signature, ui64 wId) + TEvHullFreeHugeSlots(TDiskPartVec&& hugeBlobs, TDiskPartVec&& allocatedBlobs, ui64 deletionLsn, + TLogSignature signature, ui64 wId) : HugeBlobs(std::move(hugeBlobs)) + , AllocatedBlobs(std::move(allocatedBlobs)) , DeletionLsn(deletionLsn) , Signature(signature) , WId(wId) @@ -145,7 +148,7 @@ namespace NKikimr { TString ToString() const { TStringStream str; str << "{" << Signature.ToString() << " DelLsn# " << DeletionLsn << " Slots# " << HugeBlobs.ToString() - << " WId# " << WId << "}"; + << " Allocated# " << AllocatedBlobs.ToString() << " WId# " << WId << "}"; return str.Str(); } }; @@ -216,6 +219,30 @@ namespace NKikimr { TEvHugePreCompactResult(ui64 wId) : WId(wId) {} }; + struct TEvHugeAllocateSlots : TEventLocal<TEvHugeAllocateSlots, TEvBlobStorage::EvHugeAllocateSlots> { + std::vector<ui32> BlobSizes; + + TEvHugeAllocateSlots(std::vector<ui32> blobSizes) + : BlobSizes(std::move(blobSizes)) + {} + }; + + struct TEvHugeAllocateSlotsResult : TEventLocal<TEvHugeAllocateSlotsResult, TEvBlobStorage::EvHugeAllocateSlotsResult> { + std::vector<TDiskPart> Locations; + + TEvHugeAllocateSlotsResult(std::vector<TDiskPart> locations) + : Locations(std::move(locations)) + {} + }; + + struct TEvHugeDropAllocatedSlots : TEventLocal<TEvHugeDropAllocatedSlots, TEvBlobStorage::EvHugeDropAllocatedSlots> { + std::vector<TDiskPart> Locations; + + TEvHugeDropAllocatedSlots(std::vector<TDiskPart> locations) + : Locations(std::move(locations)) + {} + }; + //////////////////////////////////////////////////////////////////////////// // THugeKeeperCtx //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h index ef5ce75f88b..b1beef6571f 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h @@ -207,169 +207,6 @@ namespace NKikimr { TString ToString() const; }; - - //////////////////////////////////////////////////////////////////////////// - // TBlobMerger - //////////////////////////////////////////////////////////////////////////// - class TBlobMerger { - using TCircaLsns = TVector<ui64>; - - public: - TBlobMerger() = default; - - void Clear() { - DiskPtrs.clear(); - Deleted.clear(); - Parts.Clear(); - CircaLsns.clear(); - } - - void SetEmptyFromAnotherMerger(const TBlobMerger *fromMerger) { - Clear(); - Deleted.insert(Deleted.end(), fromMerger->SavedData().begin(), fromMerger->SavedData().end()); - Deleted.insert(Deleted.end(), fromMerger->DeletedData().begin(), fromMerger->DeletedData().end()); - } - - void Add(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType &parts, ui64 circaLsn) { - if (DiskPtrs.empty()) { - Parts = parts; - DiskPtrs = {begin, end}; - CircaLsns = TCircaLsns(end - begin, circaLsn); - Y_ABORT_UNLESS(DiskPtrs.size() == Parts.CountBits()); - } else { - Merge(begin, end, parts, circaLsn); - } - } - - void AddDeletedPart(const TDiskPart &part) { - Deleted.push_back(part); - } - - void AddMetadataParts(NMatrix::TVectorType parts) { - // this is special case for mirror3of4, where data can have empty TDiskPart - std::array<TDiskPart, 8> zero; - zero.fill(TDiskPart()); - // empty TDiskPart at a position means that every circaLsn shoud work - const ui64 circaLsn = Max<ui64>(); - Merge(zero.begin(), zero.begin() + parts.CountBits(), parts, circaLsn); - } - - bool Empty() const { - return Parts.Empty(); - } - - void Swap(TBlobMerger &m) { - DiskPtrs.swap(m.DiskPtrs); - Deleted.swap(m.Deleted); - Parts.Swap(m.Parts); - CircaLsns.swap(m.CircaLsns); - } - - TBlobType::EType GetBlobType() const { - Y_ABORT_UNLESS(!Empty()); - return DiskPtrs.size() == 1 ? TBlobType::HugeBlob : TBlobType::ManyHugeBlobs; - } - - ui32 GetNumParts() const { - return DiskPtrs.size(); - } - - void Output(IOutputStream &str) const { - if (Empty()) { - str << "empty"; - } else { - str << "{Parts# " << Parts.ToString(); - str << " CircaLsns# " << FormatList(CircaLsns); - str << " DiskPtrs# "; - FormatList(str, DiskPtrs); - str << " Deleted# "; - FormatList(str, Deleted); - str << "}"; - } - } - - TString ToString() const { - TStringStream str; - Output(str); - return str.Str(); - } - - const NMatrix::TVectorType& GetParts() const { - return Parts; - } - - const TVector<TDiskPart> &SavedData() const { - return DiskPtrs; - } - - const TVector<TDiskPart> &DeletedData() const { - return Deleted; - } - - const TCircaLsns &GetCircaLsns() const { - return CircaLsns; - } - - private: - typedef TVector<TDiskPart> TDiskPtrs; - - TDiskPtrs DiskPtrs; - TDiskPtrs Deleted; - NMatrix::TVectorType Parts; - TCircaLsns CircaLsns; - - void Merge(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType &parts, ui64 circaLsn) - { - Y_ABORT_UNLESS(end - begin == parts.CountBits()); - Y_DEBUG_ABORT_UNLESS(Parts.GetSize() == parts.GetSize()); - const ui8 maxSize = parts.GetSize(); - TDiskPtrs newDiskPtrs; - TCircaLsns newCircaLsns; - newDiskPtrs.reserve(maxSize); - newCircaLsns.reserve(maxSize); - - TDiskPtrs::const_iterator locBegin = DiskPtrs.begin(); - TDiskPtrs::const_iterator locEnd = DiskPtrs.end(); - TDiskPtrs::const_iterator locIt = locBegin; - const TDiskPart *it = begin; - - for (ui8 i = 0; i < maxSize; i++) { - if (Parts.Get(i) && parts.Get(i)) { - // both - Y_DEBUG_ABORT_UNLESS(locIt != locEnd && it != end); - if (CircaLsns[locIt - locBegin] < circaLsn) { - // incoming value wins - newDiskPtrs.push_back(*it); - newCircaLsns.push_back(circaLsn); - Deleted.push_back(*locIt); - } else { - // already seen value wins - newDiskPtrs.push_back(*locIt); - newCircaLsns.push_back(CircaLsns[locIt - locBegin]); - Deleted.push_back(*it); - } - ++locIt; - ++it; - } else if (Parts.Get(i)) { - Y_DEBUG_ABORT_UNLESS(locIt != locEnd); - newDiskPtrs.push_back(*locIt); - newCircaLsns.push_back(CircaLsns[locIt - locBegin]); - ++locIt; - } else if (parts.Get(i)) { - Y_DEBUG_ABORT_UNLESS(it != end); - newDiskPtrs.push_back(*it); - newCircaLsns.push_back(circaLsn); - ++it; - } - } - - Parts |= parts; - DiskPtrs.swap(newDiskPtrs); - CircaLsns.swap(newCircaLsns); - Y_ABORT_UNLESS(DiskPtrs.size() == Parts.CountBits()); - } - }; - } // NHuge } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp deleted file mode 100644 index 9212c2fc301..00000000000 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp +++ /dev/null @@ -1,63 +0,0 @@ -#include "blobstorage_hullhugedefs.h" -#include <library/cpp/testing/unittest/registar.h> - -#include <util/stream/null.h> - -#define STR Cnull - - -namespace NKikimr { - - using namespace NHuge; - - Y_UNIT_TEST_SUITE(TBlobStorageHullHugeBlobMerger) { - - Y_UNIT_TEST(Basic) { - TBlobMerger merger; - - const ui32 blobSize = 128u << 10u; - NMatrix::TVectorType parts1(0, 6); - parts1.Set(1); - parts1.Set(3); - TDiskPart diskParts1[2] = {{1, blobSize * 2u, blobSize}, {2, blobSize * 4u, blobSize}}; - - NMatrix::TVectorType parts2(0, 6); - parts2.Set(2); - TDiskPart diskParts2[1] = {{3, blobSize * 6u, blobSize}}; - - NMatrix::TVectorType parts3(0, 6); - parts3.Set(2); - parts3.Set(3); - TDiskPart diskParts3[2] = {{4, blobSize * 8u, blobSize}, {5, blobSize * 10u, blobSize}}; - - - merger.Add(diskParts1, diskParts1 + 2, parts1, 4); // circaLsn = 4 - STR << merger.ToString() << "\n"; - TVector<TDiskPart> res1 = {{1, blobSize * 2u, blobSize}, {2, blobSize * 4u, blobSize}}; - TVector<ui64> sstIds1{4, 4}; - UNIT_ASSERT(merger.GetCircaLsns() == sstIds1); - UNIT_ASSERT(merger.SavedData() == res1); - UNIT_ASSERT(merger.DeletedData().empty()); - - merger.Add(diskParts2, diskParts2 + 1, parts2, 6); // circaLsn = 6 - STR << merger.ToString() << "\n"; - TVector<TDiskPart> res2 = {{1, blobSize * 2u, blobSize}, {3, blobSize * 6u, blobSize}, - {2, blobSize * 4u, blobSize}}; - TVector<ui64> sstIds2{4, 6, 4}; - UNIT_ASSERT(merger.GetCircaLsns() == sstIds2); - UNIT_ASSERT(merger.SavedData() == res2); - UNIT_ASSERT(merger.DeletedData().empty()); - - merger.Add(diskParts3, diskParts3 + 2, parts3, 5); // circaLsn = 5 - STR << merger.ToString() << "\n"; - TVector<TDiskPart> res3 = {{1, blobSize * 2u, blobSize}, {3, blobSize * 6u, blobSize}, - {5, blobSize * 10u, blobSize}}; - TVector<TDiskPart> deleted3 = {{4, blobSize * 8u, blobSize}, {2, blobSize * 4u, blobSize}}; - TVector<ui64> sstIds3{4, 6, 5}; - UNIT_ASSERT(merger.GetCircaLsns() == sstIds3); - UNIT_ASSERT(merger.SavedData() == res3); - UNIT_ASSERT(merger.DeletedData() == deleted3); - } - } - -} // NKikimr diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp index f874b1c5d58..898271ef6d4 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp @@ -428,6 +428,7 @@ namespace NKikimr { const TActorContext &ctx, ui64 lsn, const TDiskPartVec &rec, + const TDiskPartVec& allocated, ESlotDelDbType type) { ui64 *logPosDelLsn = nullptr; @@ -446,25 +447,24 @@ namespace NKikimr { } if (lsn > *logPosDelLsn) { // apply - LOG_DEBUG(ctx, BS_HULLHUGE, - VDISKP(VCtx->VDiskLogPrefix, - "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 " entryLsn# %" PRIu64 "): " - "RmHugeBlobs apply: %s", - Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data())); + LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 + " entryLsn# %" PRIu64 "): " "RmHugeBlobs apply: %s", Guid, lsn, LogPos.EntryPointLsn, + rec.ToString().data())); for (const auto &x : rec) { Heap->RecoveryModeFree(x); } + for (const auto& x : allocated) { + Heap->RecoveryModeAllocate(x); + } *logPosDelLsn = lsn; PersistentLsn = Min(PersistentLsn, lsn); return TRlas(true, false); } else { // skip - LOG_DEBUG(ctx, BS_HULLHUGE, - VDISKP(VCtx->VDiskLogPrefix, - "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 " entryLsn# %" PRIu64 "): " - "RmHugeBlobs skip: %s", - Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data())); + LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 + " entryLsn# %" PRIu64 "): " "RmHugeBlobs skip: %s", Guid, lsn, LogPos.EntryPointLsn, + rec.ToString().data())); return TRlas(true, true); } } diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h index f7b04ba7b42..ffac5bdb0ca 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h @@ -175,6 +175,7 @@ namespace NKikimr { TRlas ApplySlotsDeletion(const TActorContext &ctx, ui64 lsn, const TDiskPartVec &rec, + const TDiskPartVec& allocated, ESlotDelDbType type); TRlas Apply(const TActorContext &ctx, ui64 lsn, diff --git a/ydb/core/blobstorage/vdisk/huge/ut/ya.make b/ydb/core/blobstorage/vdisk/huge/ut/ya.make index ef3c4f7755a..f5a1d4b2d9c 100644 --- a/ydb/core/blobstorage/vdisk/huge/ut/ya.make +++ b/ydb/core/blobstorage/vdisk/huge/ut/ya.make @@ -16,7 +16,6 @@ PEERDIR( ) SRCS( - blobstorage_hullhugedefs_ut.cpp blobstorage_hullhugeheap_ctx_ut.cpp blobstorage_hullhugeheap_ut.cpp blobstorage_hullhuge_ut.cpp diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h index d849b4b9b41..c4b9f4e8ccc 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h +++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h @@ -110,7 +110,7 @@ namespace NKikimr { } } - TRope GetPart(ui8 part, TRope *holder) const { + const TRope& GetPart(ui8 part, TRope *holder) const { return GetPart(part, 0, GetPartSize(part), holder); } @@ -266,27 +266,41 @@ namespace NKikimr { // used by blob merger void MergePart(const TDiskBlob& source, TPartIterator iter) { - const ui8 part = iter.GetPartId() - 1; - Y_ABORT_UNLESS(!Rope); // ensure that this blob is used inside merger - Y_ABORT_UNLESS(FullDataSize == 0 || FullDataSize == source.FullDataSize, "FullDataSize# %" PRIu32 " source.FullDataSize# %" PRIu32, - FullDataSize, source.FullDataSize); + if (const ui8 partIdx = iter.GetPartId() - 1; Parts.Empty() || !Parts.Get(partIdx)) { + MergePart(iter.GetPart(), partIdx, source.FullDataSize, source.Parts.GetSize()); + } + } + + void MergePart(TRope&& data, ui8 partIdx, ui64 fullDataSize, ui8 totalPartCount) { + Y_ABORT_UNLESS(!Rope); + Y_ABORT_UNLESS(FullDataSize == 0 || FullDataSize == fullDataSize); if (Parts.Empty()) { - Parts = NMatrix::TVectorType(0, source.Parts.GetSize()); + Parts = NMatrix::TVectorType(0, totalPartCount); PartOffs.fill(0); // we don't care about absolute offsets here } else { - Y_ABORT_UNLESS(Parts.GetSize() == source.Parts.GetSize()); + Y_ABORT_UNLESS(Parts.GetSize() == totalPartCount); } - if (!Parts.Get(part)) { - Parts.Set(part); - TRope partData = iter.GetPart(); - for (ui8 i = part + 1; i <= Parts.GetSize(); ++i) { - PartOffs[i] += partData.GetSize(); + if (!Parts.Get(partIdx)) { + Parts.Set(partIdx); + for (ui8 i = partIdx + 1; i <= Parts.GetSize(); ++i) { + PartOffs[i] += data.GetSize(); } - Y_ABORT_UNLESS(part < PartData.size()); - PartData[part] = std::move(partData); - FullDataSize = source.FullDataSize; + Y_ABORT_UNLESS(partIdx < PartData.size()); + PartData[partIdx] = std::move(data); + FullDataSize = fullDataSize; + } + } + + void ClearPart(ui8 partIdx) { + Y_DEBUG_ABORT_UNLESS(!Parts.Empty()); + Y_DEBUG_ABORT_UNLESS(Parts.Get(partIdx)); + Parts.Clear(partIdx); + const TRope data = std::exchange(PartData[partIdx], {}); + const ui32 size = data.GetSize(); + for (ui8 i = partIdx + 1; i <= Parts.GetSize(); ++i) { + PartOffs[i] -= size; } } @@ -316,8 +330,11 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// class TDiskBlobMerger { public: - TDiskBlobMerger() - {} + TDiskBlobMerger() = default; + + TDiskBlobMerger(const TDiskBlob& blob) { + Add(blob); + } void Clear() { Blob = {}; @@ -332,6 +349,15 @@ namespace NKikimr { Blob.MergePart(source, it); } + void AddPart(TRope&& data, TBlobStorageGroupType gtype, const TLogoBlobID& id) { + Y_DEBUG_ABORT_UNLESS(id.PartId()); + Blob.MergePart(std::move(data), id.PartId() - 1, id.BlobSize(), gtype.TotalPartCount()); + } + + void ClearPart(ui8 partIdx) { + Blob.ClearPart(partIdx); + } + bool Empty() const { return Blob.Empty(); } @@ -372,54 +398,4 @@ namespace NKikimr { TDiskBlob Blob; }; - class TDiskBlobMergerWithMask : public TDiskBlobMerger { - public: - TDiskBlobMergerWithMask() = default; - TDiskBlobMergerWithMask(const TDiskBlobMergerWithMask&) = default; - TDiskBlobMergerWithMask(TDiskBlobMergerWithMask&&) = default; - - TDiskBlobMergerWithMask(const TDiskBlobMerger& base, NMatrix::TVectorType mask) - : AddFilterMask(mask) - { - // TODO(alexvru): check for saneness; maybe we shall not provide blobs not in mask? - const TDiskBlob& blob = base.GetDiskBlob(); - for (auto it = blob.begin(); it != blob.end(); ++it) { - AddPart(blob, it); - } - } - - void Clear() { - TDiskBlobMerger::Clear(); - AddFilterMask.Clear(); - } - - void SetFilterMask(NMatrix::TVectorType mask) { - Y_ABORT_UNLESS(!AddFilterMask); - AddFilterMask = mask; - } - - void Add(const TDiskBlob &addBlob) { - Y_ABORT_UNLESS(AddFilterMask); - NMatrix::TVectorType addParts = addBlob.GetParts() & *AddFilterMask; - if (!addParts.Empty()) { - TDiskBlobMerger::AddImpl(addBlob, addParts); - } - } - - void AddPart(const TDiskBlob& source, const TDiskBlob::TPartIterator& it) { - Y_ABORT_UNLESS(AddFilterMask); - if (AddFilterMask->Get(it.GetPartId() - 1)) { - TDiskBlobMerger::AddPart(source, it); - } - } - - void Swap(TDiskBlobMergerWithMask& m) { - TDiskBlobMerger::Swap(m); - DoSwap(AddFilterMask, m.AddFilterMask); - } - - private: - TMaybe<NMatrix::TVectorType> AddFilterMask; - }; - } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp index 70b622d79bd..21548fa0759 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp +++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp @@ -143,53 +143,6 @@ namespace NKikimr { } } } - - Y_UNIT_TEST(FilterMask) { - for (bool addHeader1 : {true, false}) { - for (bool addHeader2 : {true, false}) { - for (bool addHeader3 : {true, false}) { - const ui8 numParts = 6; - for (ui32 mask1 = 1; mask1 < (1 << numParts); ++mask1) { - for (ui32 mask2 = 1; mask2 < (1 << numParts); ++mask2) { - if (!(mask1 & mask2)) { - continue; - } - - NMatrix::TVectorType partsToStore(0, numParts); - for (ui8 i = 0; i < numParts; ++i) { - if (mask2 >> i & 1) { - partsToStore.Set(i); - } - } - - TDiskBlobMergerWithMask m; - m.SetFilterMask(partsToStore); - for (ui8 i = 0; i < numParts; ++i) { - if (mask1 >> i & 1) { - NMatrix::TVectorType v(0, numParts); - v.Set(i); - TRope buffer = TDiskBlob::Create(8, i + 1, numParts, TRope(Sprintf("%08x", i)), Arena, addHeader1); - m.Add(TDiskBlob(&buffer, v, GType, TLogoBlobID(0, 0, 0, 0, 8, 0))); - } - } - - TDiskBlobMerger m2; - for (ui8 i = 0; i < numParts; ++i) { - if ((mask1 & mask2) >> i & 1) { - NMatrix::TVectorType v(0, numParts); - v.Set(i); - TRope buffer = TDiskBlob::Create(8, i + 1, numParts, TRope(Sprintf("%08x", i)), Arena, addHeader2); - m2.Add(TDiskBlob(&buffer, v, GType, TLogoBlobID(0, 0, 0, 0, 8, 0))); - } - } - - UNIT_ASSERT_EQUAL(m.CreateDiskBlob(Arena, addHeader3), m2.CreateDiskBlob(Arena, addHeader3)); - } - } - } - } - } - } } } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp index ad5f8d223f0..f73a386a865 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp +++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp @@ -88,8 +88,9 @@ namespace NKikimr { THullCtx::THullCtx(TVDiskContextPtr vctx, ui32 chunkSize, ui32 compWorthReadSize, bool freshCompaction, bool gcOnlySynced, bool allowKeepFlags, bool barrierValidation, ui32 hullSstSizeInChunksFresh, ui32 hullSstSizeInChunksLevel, double hullCompFreeSpaceThreshold, ui32 freshCompMaxInFlightWrites, - ui32 hullCompMaxInFlightWrites, ui32 hullCompMaxInFlightReads, double hullCompReadBatchEfficiencyThreshold, - TDuration hullCompStorageRatioCalcPeriod, TDuration hullCompStorageRatioMaxCalcDuration, bool addHeader) + ui32 freshCompMaxInFlightReads, ui32 hullCompMaxInFlightWrites, ui32 hullCompMaxInFlightReads, + double hullCompReadBatchEfficiencyThreshold, TDuration hullCompStorageRatioCalcPeriod, + TDuration hullCompStorageRatioMaxCalcDuration, bool addHeader) : VCtx(std::move(vctx)) , IngressCache(TIngressCache::Create(VCtx->Top, VCtx->ShortSelfVDisk)) , ChunkSize(chunkSize) @@ -102,6 +103,7 @@ namespace NKikimr { , HullSstSizeInChunksLevel(hullSstSizeInChunksLevel) , HullCompFreeSpaceThreshold(hullCompFreeSpaceThreshold) , FreshCompMaxInFlightWrites(freshCompMaxInFlightWrites) + , FreshCompMaxInFlightReads(freshCompMaxInFlightReads) , HullCompMaxInFlightWrites(hullCompMaxInFlightWrites) , HullCompMaxInFlightReads(hullCompMaxInFlightReads) , HullCompReadBatchEfficiencyThreshold(hullCompReadBatchEfficiencyThreshold) diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h index 72406faee5a..d21c5096e3a 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h +++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h @@ -132,6 +132,7 @@ namespace NKikimr { const ui32 HullSstSizeInChunksLevel; const double HullCompFreeSpaceThreshold; const ui32 FreshCompMaxInFlightWrites; + const ui32 FreshCompMaxInFlightReads; const ui32 HullCompMaxInFlightWrites; const ui32 HullCompMaxInFlightReads; const double HullCompReadBatchEfficiencyThreshold; @@ -154,6 +155,7 @@ namespace NKikimr { ui32 hullSstSizeInChunksLevel, double hullCompFreeSpaceThreshold, ui32 freshCompMaxInFlightWrites, + ui32 freshCompMaxInFlightReads, ui32 hullCompMaxInFlightWrites, ui32 hullCompMaxInFlightReads, double hullCompReadBatchEfficiencyThreshold, diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h index 55ec8622805..604a0aa3162 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h +++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h @@ -71,11 +71,6 @@ namespace NKikimr { TabletId, Channel, Gen, GenCounter, Hard ? "hard" : "soft"); } - TLogoBlobID LogoBlobID() const { - return TLogoBlobID(); - } - - void Serialize(NKikimrBlobStorage::TBarrierKey &proto) const { proto.SetTabletId(TabletId); proto.SetChannel(Channel); diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h index 0e804112870..89f4145164a 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h +++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h @@ -33,10 +33,6 @@ namespace NKikimr { return Sprintf("[%16" PRIu64 "]", TabletId); } - TLogoBlobID LogoBlobID() const { - return TLogoBlobID(); - } - static TKeyBlock First() { return TKeyBlock(); } diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h index c2777d6ee9c..84916881f01 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h +++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h @@ -27,6 +27,7 @@ namespace NKikimr { 1, // HullSstSizeInChunksLevel 2.0, 10, // FreshCompMaxInFlightWrites + 10, // FreshCompMaxInFlightReads 10, // HullCompMaxInFlightWrites 20, // HullCompMaxInFlightReads 0.5, diff --git a/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h b/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h index 2c579e4260e..894fbf9fcb4 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h +++ b/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h @@ -74,6 +74,7 @@ namespace NKikimr { TLeveledSsts TablesToAdd; // huge blobs to delete TDiskPartVec HugeBlobsToDelete; + TDiskPartVec HugeBlobsAllocated; // is data finalized bool Finalized = false; @@ -101,6 +102,11 @@ namespace NKikimr { return HugeBlobsToDelete; } + const TDiskPartVec &GetHugeBlobsAllocated() const { + Y_ABORT_UNLESS(Finalized); + return HugeBlobsAllocated; + } + TDiskPartVec ExtractHugeBlobsToDelete() { Y_ABORT_UNLESS(Finalized); return std::move(HugeBlobsToDelete); @@ -202,6 +208,7 @@ namespace NKikimr { using TBase::TablesToDelete; using TBase::TablesToAdd; using TBase::HugeBlobsToDelete; + using TBase::HugeBlobsAllocated; ui32 TargetLevel = (ui32)(-1); TKey LastCompactedKey = TKey::First(); @@ -241,7 +248,8 @@ namespace NKikimr { void CompactionFinished( TOrderedLevelSegmentsPtr &&segVec, - TDiskPartVec &&hugeBlobsToDelete, + TDiskPartVec&& hugeBlobsToDelete, + TDiskPartVec&& hugeBlobsAllocated, bool aborted) { if (aborted) { @@ -249,9 +257,11 @@ namespace NKikimr { TablesToDelete.Clear(); TablesToAdd.Clear(); HugeBlobsToDelete.Clear(); + HugeBlobsAllocated.Clear(); } else { Y_ABORT_UNLESS(!TablesToDelete.Empty()); HugeBlobsToDelete = std::move(hugeBlobsToDelete); + HugeBlobsAllocated = std::move(hugeBlobsAllocated); if (segVec) { TLeveledSsts tmp(TargetLevel, *segVec); TablesToAdd.Swap(tmp); @@ -316,6 +326,10 @@ namespace NKikimr { return GetPtr()->GetHugeBlobsToDelete(); } + const TDiskPartVec &GetHugeBlobsAllocated() const { + return GetPtr()->GetHugeBlobsAllocated(); + } + TDiskPartVec ExtractHugeBlobsToDelete() { return const_cast<TBase*>(GetPtr())->ExtractHugeBlobsToDelete(); } diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h index adbca0fa8f2..ab7282f0c82 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h @@ -3,101 +3,351 @@ #include "defs.h" #include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h> #include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h> +#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_logoblob.h> #include <ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h> #include <util/generic/noncopyable.h> namespace NKikimr { - //////////////////////////////////////////////////////////////////////////// - // TDataMerger - //////////////////////////////////////////////////////////////////////////// class TDataMerger : TNonCopyable { + public: + struct TCollectTask { + TDiskBlobMerger BlobMerger; // base blob for compaction (obtained from in-memory records) + std::vector<std::tuple<TDiskPart, ui8>> Reads; // a set of extra reads _of distinct parts, not blobs_ + + void Clear() { + BlobMerger.Clear(); + Reads.clear(); + } + }; + + struct THugeBlobWrite { + ui8 PartIdx; + const TRope *From; + TDiskPart To; + }; + + struct THugeBlobMove { + ui8 PartIdx; + TDiskPart From; + TDiskPart To; + }; + private: - TDiskBlobMerger DiskBlobMerger; - NHuge::TBlobMerger HugeBlobMerger; + struct TPart { + const TRope *InMemData = nullptr; + TDiskPart SmallBlobPart; // a reference to part data, without any headers + TDiskPart HugeBlob; // a reference to the whole huge blob, with headers added + ui64 HugeBlobCircaLsn = 0; + ui32 HugePartSize = 0; + bool IsMetadataPart = false; + + bool NeedHugeSlot() const { + return HugeBlob.Empty() && (InMemData || !SmallBlobPart.Empty()); + } + }; + + // immutable fields + TBlobStorageGroupType GType; + bool AddHeader = false; + + // clearable fields + std::vector<TPart> Parts; + std::vector<TDiskPart> DeletedHugeBlobs; + NMatrix::TVectorType PartsMask; // mask of all present parts + std::vector<TDiskPart> SavedHugeBlobs; + bool Finished = false; + std::vector<ui32> SlotsToAllocate; + TCollectTask CollectTask; + std::vector<THugeBlobWrite> HugeBlobWrites; + std::vector<THugeBlobMove> HugeBlobMoves; public: - TDataMerger() - : DiskBlobMerger() - , HugeBlobMerger() + TDataMerger(TBlobStorageGroupType gtype, bool addHeader) + : GType(gtype) + , AddHeader(addHeader) + , Parts(GType.TotalPartCount()) + , PartsMask(0, GType.TotalPartCount()) {} bool Empty() const { - return DiskBlobMerger.Empty() && HugeBlobMerger.Empty(); + return PartsMask.Empty(); } void Clear() { - DiskBlobMerger.Clear(); - HugeBlobMerger.Clear(); + std::ranges::fill(Parts, TPart()); + DeletedHugeBlobs.clear(); + PartsMask.Clear(); + SavedHugeBlobs.clear(); + Finished = false; + SlotsToAllocate.clear(); + CollectTask.Clear(); + HugeBlobWrites.clear(); + HugeBlobMoves.clear(); } - void Swap(TDataMerger &m) { - DiskBlobMerger.Swap(m.DiskBlobMerger); - HugeBlobMerger.Swap(m.HugeBlobMerger); - } + void Add(const TMemRecLogoBlob& memRec, std::variant<const TRope*, const TDiskPart*> dataOrOutbound, + ui64 circaLsn, const TLogoBlobID& fullId) { + const NMatrix::TVectorType parts = memRec.GetLocalParts(GType); + + if (memRec.GetType() == TBlobType::MemBlob) { + auto data = std::visit<const TRope*>(TOverloaded{ + [&](const TRope *x) { return x; }, + [&](const TDiskPart*) { return nullptr; } + }, dataOrOutbound); - TBlobType::EType GetType() const { - Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty() || DiskBlobMerger.Empty()); - if (!HugeBlobMerger.Empty()) { - return HugeBlobMerger.GetBlobType(); + Y_ABORT_UNLESS(data); + Y_DEBUG_ABORT_UNLESS(parts.CountBits() == 1); // only single part per record in memory + for (ui8 partIdx : parts) { + Y_DEBUG_ABORT_UNLESS(partIdx < Parts.size()); + Parts[partIdx].InMemData = data; + PartsMask.Set(partIdx); + } } else { - // both for !DiskBlobMerger.Empty() and DiskBlobMerger.Empty() - return TBlobType::DiskBlob; + auto outbound = std::visit<const TDiskPart*>(TOverloaded{ + [&](const TRope *x) { Y_DEBUG_ABORT_UNLESS(!x); return nullptr; }, + [&](const TDiskPart *x) { return x; } + }, dataOrOutbound); + + TDiskDataExtractor extr; + memRec.GetDiskData(&extr, outbound); + + if (memRec.GetType() == TBlobType::DiskBlob) { + const TDiskPart& location = extr.SwearOne(); + ui32 offset = AddHeader ? TDiskBlob::HeaderSize : 0; + for (ui8 partIdx : parts) { + const ui32 partSize = GType.PartSize(TLogoBlobID(fullId, partIdx + 1)); + Y_DEBUG_ABORT_UNLESS(partIdx < Parts.size()); + if (partSize) { + Parts[partIdx].SmallBlobPart = {location.ChunkIdx, location.Offset + offset, partSize}; + } + offset += partSize; + PartsMask.Set(partIdx); + } + } else { + AddHugeBlob(extr.Begin, extr.End, parts, circaLsn); + } } } - ui32 GetInplacedSize(bool addHeader) const { - Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty() || DiskBlobMerger.Empty()); - return HugeBlobMerger.Empty() ? DiskBlobMerger.GetDiskBlob().GetBlobSize(addHeader) : 0; - } + void Finish(bool targetingHugeBlob, const TLogoBlobID& fullId, TBlobType::EType *type, ui32 *inplacedDataSize) { + Y_DEBUG_ABORT_UNLESS(!Finished); + + if (!Empty()) { + // scan through all the parts, see what we got + NMatrix::TVectorType inMemParts(0, GType.TotalPartCount()); + NMatrix::TVectorType smallDiskParts(0, GType.TotalPartCount()); + NMatrix::TVectorType hugeDiskParts(0, GType.TotalPartCount()); + for (ui8 partIdx : PartsMask) { + TPart& part = Parts[partIdx]; + if (part.InMemData) { + inMemParts.Set(partIdx); + } + if (!part.SmallBlobPart.Empty()) { + smallDiskParts.Set(partIdx); + } + if (!part.HugeBlob.Empty()) { + hugeDiskParts.Set(partIdx); + } + } + + bool producingHugeBlob = false; + + if (inMemParts.Empty() && smallDiskParts.Empty()) { // we only have huge blobs, so keep it this way + producingHugeBlob = true; + } else { + producingHugeBlob = targetingHugeBlob; + } - void AddHugeBlob(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType &parts, - ui64 circaLsn) { - Y_DEBUG_ABORT_UNLESS(DiskBlobMerger.Empty()); - HugeBlobMerger.Add(begin, end, parts, circaLsn); + // calculate blob for if we are going to keep it inplace + *inplacedDataSize = producingHugeBlob + ? 0 + : TDiskBlob::CalculateBlobSize(GType, fullId, PartsMask, AddHeader); + + TDiskBlobMerger merger; + + for (ui8 partIdx : PartsMask) { + TPart& part = Parts[partIdx]; + const ui32 partSize = GType.PartSize(TLogoBlobID(fullId, partIdx + 1)); + part.IsMetadataPart = !partSize; + const NMatrix::TVectorType partMask = NMatrix::TVectorType::MakeOneHot(partIdx, GType.TotalPartCount()); + + if (producingHugeBlob) { + SavedHugeBlobs.push_back(part.HugeBlob); + if (!part.IsMetadataPart && part.NeedHugeSlot()) { + part.HugePartSize = TDiskBlob::CalculateBlobSize(GType, fullId, partMask, AddHeader); + SlotsToAllocate.push_back(part.HugePartSize); + } + } else { + if (part.InMemData) { // prefer in-memory data if we have options + merger.Add(TDiskBlob(part.InMemData, partMask, GType, fullId)); + } else if (!part.SmallBlobPart.Empty()) { + CollectTask.Reads.emplace_back(part.SmallBlobPart, partIdx); + } else if (!part.HugeBlob.Empty()) { // dropping this huge part after compaction + TDiskPart location; + if (part.HugeBlob.Size == partSize) { + location = part.HugeBlob; + } else if (part.HugeBlob.Size == partSize + TDiskBlob::HeaderSize) { + location = TDiskPart(part.HugeBlob.ChunkIdx, part.HugeBlob.Offset + TDiskBlob::HeaderSize, + part.HugeBlob.Size - TDiskBlob::HeaderSize); + } else { + Y_ABORT("incorrect huge blob size"); + } + CollectTask.Reads.emplace_back(location, partIdx); + DeletedHugeBlobs.push_back(part.HugeBlob); + } else { // add metadata part to merger + merger.AddPart(TRope(), GType, TLogoBlobID(fullId, partIdx + 1)); + } + } + } + + Y_DEBUG_ABORT_UNLESS(!producingHugeBlob || merger.Empty()); + CollectTask.BlobMerger = merger; + + *type = !producingHugeBlob ? TBlobType::DiskBlob : + PartsMask.CountBits() > 1 ? TBlobType::ManyHugeBlobs : TBlobType::HugeBlob; + + Y_DEBUG_ABORT_UNLESS(SavedHugeBlobs.size() == (producingHugeBlob ? PartsMask.CountBits() : 0)); + } + + Finished = true; } - void AddDeletedHugeBlob(const TDiskPart& part) { - HugeBlobMerger.AddDeletedPart(part); + void FinishFromBlob() { + Y_DEBUG_ABORT_UNLESS(!Finished); + Finished = true; } - void AddBlob(const TDiskBlob &addBlob) { - Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty()); - DiskBlobMerger.Add(addBlob); + void AddHugeBlob(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType& parts, ui64 circaLsn) { + Y_DEBUG_ABORT_UNLESS(parts.CountBits() == end - begin); + const TDiskPart *location = begin; + for (ui8 partIdx : parts) { + Y_DEBUG_ABORT_UNLESS(partIdx < Parts.size()); + auto& part = Parts[partIdx]; + if (location->ChunkIdx) { // consider only data parts + if (part.HugeBlob.Empty() || part.HugeBlobCircaLsn < circaLsn) { + if (!part.HugeBlob.Empty()) { + DeletedHugeBlobs.push_back(part.HugeBlob); + } + part.HugeBlob = *location; + part.HugeBlobCircaLsn = circaLsn; + } else { + DeletedHugeBlobs.push_back(*location); + } + } + ++location; + } + Y_DEBUG_ABORT_UNLESS(location == end); + PartsMask |= parts; } - void AddPart(const TDiskBlob& source, const TDiskBlob::TPartIterator& iter) { - Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty()); - DiskBlobMerger.AddPart(source, iter); + template<typename T> + T& CheckFinished(T& value) const { + Y_DEBUG_ABORT_UNLESS(Finished); + return value; } - void SetEmptyFromAnotherMerger(const TDataMerger *dataMerger) { - DiskBlobMerger.Clear(); - HugeBlobMerger.SetEmptyFromAnotherMerger(&dataMerger->HugeBlobMerger); + const NMatrix::TVectorType GetParts() const { return CheckFinished(PartsMask); } + const std::vector<TDiskPart>& GetSavedHugeBlobs() const { return CheckFinished(SavedHugeBlobs); } + const std::vector<TDiskPart>& GetDeletedHugeBlobs() const { return CheckFinished(DeletedHugeBlobs); } + const TCollectTask& GetCollectTask() const { return CheckFinished(CollectTask); } + const auto& GetHugeBlobWrites() const { return CheckFinished(HugeBlobWrites); } + const auto& GetHugeBlobMoves() const { return CheckFinished(HugeBlobMoves); } + + bool Ready() const { + Y_DEBUG_ABORT_UNLESS(Finished); + return CollectTask.Reads.empty() + && HugeBlobWrites.empty() + && HugeBlobMoves.empty(); } - bool HasSmallBlobs() const { - return !DiskBlobMerger.Empty(); + TRope CreateDiskBlob(TRopeArena& arena) { + Y_DEBUG_ABORT_UNLESS(Finished); + Y_DEBUG_ABORT_UNLESS(!CollectTask.BlobMerger.Empty()); + return CollectTask.BlobMerger.CreateDiskBlob(arena, AddHeader); } - ui32 GetDiskBlobRawSize(bool addHeader) const { - Y_DEBUG_ABORT_UNLESS(!DiskBlobMerger.Empty()); - return DiskBlobMerger.GetDiskBlob().GetBlobSize(addHeader); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Huge slots operation + + std::vector<ui32>& GetSlotsToAllocate() { + Y_DEBUG_ABORT_UNLESS(Finished); + return SlotsToAllocate; } - const TDiskBlobMerger &GetDiskBlobMerger() const { - return DiskBlobMerger; + void ApplyAllocatedSlots(std::vector<TDiskPart>& allocatedSlots) { + size_t savedIndex = 0; + size_t index = 0; + for (ui8 partIdx : PartsMask) { + if (const TPart& part = Parts[partIdx]; !part.HugeBlob.Empty()) { + Y_DEBUG_ABORT_UNLESS(part.HugeBlob == SavedHugeBlobs[savedIndex]); + ++savedIndex; + } else if (!part.IsMetadataPart && part.NeedHugeSlot()) { + TDiskPart& location = SavedHugeBlobs[savedIndex++] = allocatedSlots[index++]; + Y_ABORT_UNLESS(part.HugePartSize <= location.Size); + location.Size = part.HugePartSize; + if (part.InMemData) { + HugeBlobWrites.push_back({ + .PartIdx = partIdx, + .From = part.InMemData, + .To = location, + }); + } else if (!part.SmallBlobPart.Empty()) { + HugeBlobMoves.push_back({ + .PartIdx = partIdx, + .From = part.SmallBlobPart, + .To = location, + }); + } else { + Y_ABORT("impossible case"); + } + } + } + Y_DEBUG_ABORT_UNLESS(savedIndex == SavedHugeBlobs.size()); + Y_DEBUG_ABORT_UNLESS(index == allocatedSlots.size()); } - const NHuge::TBlobMerger &GetHugeBlobMerger() const { - return HugeBlobMerger; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // THandoffMap transformations; called before any reads were done, metadata-only processing (but within Fresh + // compation some data might be already available) + + void MakeEmpty() { + Y_DEBUG_ABORT_UNLESS(Finished); + DeletedHugeBlobs.insert(DeletedHugeBlobs.end(), SavedHugeBlobs.begin(), SavedHugeBlobs.end()); + auto deletedHugeBlobs = std::move(DeletedHugeBlobs); + Clear(); + DeletedHugeBlobs = std::move(deletedHugeBlobs); + Finished = true; } - TString ToString() const { - TStringStream str; - str << "{DiskBlobMerger# " << DiskBlobMerger.ToString(); - str << " HugeBlobMerger# " << HugeBlobMerger.ToString() << "}"; - return str.Str(); + void FilterLocalParts(NMatrix::TVectorType remainingLocalParts, const TLogoBlobID& fullId, + TBlobType::EType *type, ui32 *inplacedDataSize) { + Y_DEBUG_ABORT_UNLESS(Finished); + const NMatrix::TVectorType partsToRemove = PartsMask & ~remainingLocalParts; + for (ui8 partIdx : partsToRemove) { // local parts to remove + if (const auto& part = Parts[partIdx].HugeBlob; !part.Empty()) { + DeletedHugeBlobs.push_back(part); + const auto it = std::remove(SavedHugeBlobs.begin(), SavedHugeBlobs.end(), part); + Y_ABORT_UNLESS(std::next(it) == SavedHugeBlobs.end()); + SavedHugeBlobs.pop_back(); + } else if (Parts[partIdx].InMemData) { + CollectTask.BlobMerger.ClearPart(partIdx); + } + Parts[partIdx] = {}; + } + + auto pred = [&](const std::tuple<TDiskPart, ui8>& x) { return partsToRemove.Get(std::get<1>(x)); }; + CollectTask.Reads.erase(std::remove_if(CollectTask.Reads.begin(), CollectTask.Reads.end(), pred), + CollectTask.Reads.end()); + + PartsMask &= remainingLocalParts; + + // recalculate memrec data for this new entry + *type = SavedHugeBlobs.empty() ? TBlobType::DiskBlob : + SavedHugeBlobs.size() == 1 ? TBlobType::HugeBlob : TBlobType::ManyHugeBlobs; + if (!Empty() && *type == TBlobType::DiskBlob) { + *inplacedDataSize = TDiskBlob::CalculateBlobSize(GType, fullId, PartsMask, AddHeader); + } } }; diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h index cf6eba18ab9..077393edf80 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h @@ -123,163 +123,74 @@ namespace NKikimr { using TBase::Empty; using TBase::AddBasic; - enum class ELoadData { - NotSet = 0, - LoadData = 1, - DontLoadData = 2 - }; - public: - TCompactRecordMerger(const TBlobStorageGroupType >ype, bool addHeader) + TCompactRecordMerger(TBlobStorageGroupType gtype, bool addHeader) : TBase(gtype, true) , AddHeader(addHeader) + , DataMerger(gtype, addHeader) {} void Clear() { TBase::Clear(); - MemRecs.clear(); - ProducingSmallBlob = false; - ProducingHugeBlob = false; - NeedToLoadData = ELoadData::NotSet; DataMerger.Clear(); + Key = {}; } using TBase::GetMemRec; using TBase::HaveToMergeData; - void SetLoadDataMode(bool loadData) { - NeedToLoadData = loadData ? ELoadData::LoadData : ELoadData::DontLoadData; - } - void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) { - Add(memRec, nullptr, outbound, key, circaLsn); + Add(memRec, outbound, key, circaLsn); } void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) { - Add(memRec, data, nullptr, key, lsn); + Add(memRec, data, key, lsn); } - void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) { - Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet); + void Add(const TMemRec& memRec, std::variant<const TRope*, const TDiskPart*> dataOrOutbound, const TKey& key, ui64 lsn) { + Y_DEBUG_ABORT_UNLESS(Key == TKey{} || Key == key); + Key = key; AddBasic(memRec, key); - if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) { - TDiskDataExtractor extr; - switch (memRec.GetType()) { - case TBlobType::MemBlob: - case TBlobType::DiskBlob: - if (NeedToLoadData == ELoadData::LoadData) { - if (data) { - // we have some data in-memory - DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID())); - } - if (memRec.GetType() == TBlobType::DiskBlob) { - if (memRec.HasData()) { // there is something to read from the disk - MemRecs.push_back(memRec); - } else { // headerless metadata stored - static TRope emptyRope; - DataMerger.AddBlob(TDiskBlob(&emptyRope, local, GType, key.LogoBlobID())); - } - } - Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob); - ProducingSmallBlob = true; - } - break; - - case TBlobType::ManyHugeBlobs: - Y_ABORT_UNLESS(outbound); - [[fallthrough]]; - case TBlobType::HugeBlob: - memRec.GetDiskData(&extr, outbound); - DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn); - Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob); - ProducingHugeBlob = true; - break; - } + if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) { + DataMerger.Add(memRec, dataOrOutbound, lsn, key.LogoBlobID()); } - VerifyConsistency(memRec, outbound); } - void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) { -#ifdef NDEBUG - return; -#endif - if constexpr (std::is_same_v<TMemRec, TMemRecLogoBlob>) { - if (const auto m = memRec.GetType(); m == TBlobType::HugeBlob || m == TBlobType::ManyHugeBlobs) { - TDiskDataExtractor extr; - memRec.GetDiskData(&extr, outbound); - Y_ABORT_UNLESS(extr.End - extr.Begin == memRec.GetIngress().LocalParts(GType).CountBits()); - } - switch (memRec.GetType()) { - case TBlobType::DiskBlob: - Y_ABORT_UNLESS(!memRec.HasData() || DataMerger.GetHugeBlobMerger().Empty()); - break; - - case TBlobType::HugeBlob: - case TBlobType::ManyHugeBlobs: - Y_ABORT_UNLESS(memRec.HasData() && DataMerger.GetDiskBlobMerger().Empty()); - break; - - case TBlobType::MemBlob: - Y_ABORT_UNLESS(memRec.HasData() && DataMerger.GetHugeBlobMerger().Empty()); - break; - } - VerifyConsistency(); - } - } + void Finish(bool targetingHugeBlob) { + Y_DEBUG_ABORT_UNLESS(!Finished); + Y_DEBUG_ABORT_UNLESS(!Empty()); - void VerifyConsistency() { - if constexpr (std::is_same_v<TMemRec, TMemRecLogoBlob>) { - Y_DEBUG_ABORT_UNLESS(DataMerger.GetHugeBlobMerger().Empty() || MemRec.GetIngress().LocalParts(GType).CountBits() == - DataMerger.GetHugeBlobMerger().GetNumParts()); - } - } + MemRec.SetNoBlob(); - void Finish() { - if (NeedToLoadData == ELoadData::DontLoadData) { - Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger - // if we have huge blobs for the record, than we set TBlobType::HugeBlob or - // TBlobType::ManyHugeBlobs a few lines below - MemRec.SetNoBlob(); - } + if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) { + TBlobType::EType type = TBlobType::DiskBlob; + ui32 inplacedDataSize = 0; + DataMerger.Finish(targetingHugeBlob, Key.LogoBlobID(), &type, &inplacedDataSize); - Y_DEBUG_ABORT_UNLESS(!Empty()); - VerifyConsistency(); + const NMatrix::TVectorType localParts = MemRec.GetLocalParts(GType); + Y_ABORT_UNLESS(localParts == DataMerger.GetParts()); - // in case when we keep data and disk merger contains small blobs, we set up small blob record -- this logic - // is used in replication and in fresh compaction - if (NeedToLoadData == ELoadData::LoadData && DataMerger.HasSmallBlobs()) { - TDiskPart addr(0, 0, DataMerger.GetDiskBlobRawSize(AddHeader)); - MemRec.SetDiskBlob(addr); - } + MemRec.SetType(type); - // clear local bits if we are not going to keep item's data - if (NeedToLoadData == ELoadData::DontLoadData) { - MemRec.ClearLocalParts(GType); + if (type == TBlobType::DiskBlob && inplacedDataSize) { + Y_ABORT_UNLESS(inplacedDataSize == TDiskBlob::CalculateBlobSize(GType, Key.LogoBlobID(), + localParts, AddHeader)); + MemRec.SetDiskBlob(TDiskPart(0, 0, inplacedDataSize)); + } } Finished = true; - MemRec.SetType(DataMerger.GetType()); } - const TDataMerger *GetDataMerger() const { + TDataMerger& GetDataMerger() { Y_DEBUG_ABORT_UNLESS(Finished); - return &DataMerger; - } - - template<typename TCallback> - void ForEachSmallDiskBlob(TCallback&& callback) { - for (const auto& memRec : MemRecs) { - callback(memRec); - } + return DataMerger; } protected: - TStackVec<TMemRec, 16> MemRecs; - bool ProducingSmallBlob = false; - bool ProducingHugeBlob = false; - ELoadData NeedToLoadData = ELoadData::NotSet; - TDataMerger DataMerger; const bool AddHeader; + TDataMerger DataMerger; + TKey Key; }; //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h index 748f6d56f7c..b0f9608fb33 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h @@ -507,49 +507,40 @@ namespace NKikimr { // check that keys are coming in strictly ascending order Y_ABORT_UNLESS(Recs.empty() || Recs.back().Key < key); - switch (memRec.GetType()) { - case TBlobType::DiskBlob: { + TMemRec newMemRec(memRec); + + switch (const TBlobType::EType type = memRec.GetType()) { + case TBlobType::DiskBlob: InplaceDataTotalSize += memRec.DataSize(); ItemsWithInplacedData += !!memRec.DataSize(); - Recs.push_back(TRec(key, memRec)); break; - } - case TBlobType::HugeBlob: { - const TVector<TDiskPart> &saved = dataMerger->GetHugeBlobMerger().SavedData(); - Y_ABORT_UNLESS(saved.size() == 1); - TMemRec memRecTmp(memRec); - memRecTmp.SetHugeBlob(saved.at(0)); - HugeDataTotalSize += memRecTmp.DataSize(); - ItemsWithHugeData++; - Recs.push_back(TRec(key, memRecTmp)); - break; - } + case TBlobType::HugeBlob: case TBlobType::ManyHugeBlobs: { - auto beg = dataMerger->GetHugeBlobMerger().SavedData().begin(); - auto end = dataMerger->GetHugeBlobMerger().SavedData().end(); - - Y_DEBUG_ABORT_UNLESS(beg + 1 < end); - TMemRec newMemRec(memRec); - ui32 idx = ui32(Outbound.size()); - ui32 num = ui32(end - beg); - ui32 size = 0; - for (auto it = beg; it != end; ++it) { - size += it->Size; - } - newMemRec.SetManyHugeBlobs(idx, num, size); - for (auto it = beg; it != end; ++it) { - Outbound.push_back(*it); + Y_DEBUG_ABORT_UNLESS(dataMerger); + const std::vector<TDiskPart>& saved = dataMerger->GetSavedHugeBlobs(); + + if (saved.size() == 1) { + newMemRec.SetHugeBlob(saved.front()); + } else { + ui32 size = 0; + for (const TDiskPart& part : saved) { + size += part.Size; + } + newMemRec.SetManyHugeBlobs(Outbound.size(), saved.size(), size); + Outbound.insert(Outbound.end(), saved.begin(), saved.end()); } - HugeDataTotalSize += size + sizeof(TDiskPart) * num; - Recs.push_back(TRec(key, newMemRec)); ItemsWithHugeData++; + HugeDataTotalSize += newMemRec.DataSize() + (saved.size() > 1 ? saved.size() * sizeof(TDiskPart) : 0); break; } - default: Y_ABORT("Impossible case"); + + default: + Y_ABORT("Impossible case"); } + Recs.emplace_back(key, newMemRec); ++Items; } @@ -831,6 +822,7 @@ namespace NKikimr { , ChunkSize(chunkSize) , Arena(arena) , AddHeader(addHeader) + , GType(vctx->Top->GType) {} bool Empty() const { @@ -844,10 +836,12 @@ namespace NKikimr { return IndexBuilder.GetUsageAfterPush(chunks, intermSize, numAddedOuts) <= ChunksToUse; } - bool PushIndexOnly(const TKey& key, const TMemRec& memRec, const TDataMerger *dataMerger, ui32 inplacedDataSize, - TDiskPart *location) { + bool PushIndexOnly(const TKey& key, const TMemRec& memRec, const TDataMerger *dataMerger, TDiskPart *location) { + Y_ABORT_UNLESS((std::is_same_v<TKey, TKeyLogoBlob>) == !!dataMerger); + // inplacedDataSize must be nonzero for DiskBlob with data and zero in all other cases - ui32 numAddedOuts = dataMerger->GetHugeBlobMerger().SavedData().size(); + const ui32 inplacedDataSize = memRec.GetType() == TBlobType::DiskBlob ? memRec.DataSize() : 0; + const ui32 numAddedOuts = dataMerger ? dataMerger->GetSavedHugeBlobs().size() : 0; if (!CheckSpace(inplacedDataSize, numAddedOuts)) { return false; } @@ -867,9 +861,18 @@ namespace NKikimr { case TBlobType::DiskBlob: Y_DEBUG_ABORT_UNLESS(numAddedOuts == 0); + if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) { + const NMatrix::TVectorType localParts = memRec.GetLocalParts(GType); + Y_ABORT_UNLESS(inplacedDataSize == (localParts.Empty() ? 0 : TDiskBlob::CalculateBlobSize(GType, + key.LogoBlobID(), memRec.GetLocalParts(GType), AddHeader))); + } else { + Y_ABORT_UNLESS(inplacedDataSize == 0); + } if (inplacedDataSize) { *location = DataWriter.Preallocate(inplacedDataSize); memRecToAdd.SetDiskBlob(*location); + } else { + memRecToAdd.SetNoBlob(); } break; @@ -885,32 +888,6 @@ namespace NKikimr { return DataWriter.Push(std::move(buffer)); } - // return false on no-more-space - bool Push(const TKey &key, const TMemRec &memRec, const TDataMerger *dataMerger, ui32 inplacedDataSize = 0) { - // if this is filled disk blob, then we extract data and calculate inplacedDataSize - TRope data; - const auto& diskBlobMerger = dataMerger->GetDiskBlobMerger(); - if (memRec.GetType() == TBlobType::DiskBlob && !diskBlobMerger.Empty()) { - data = diskBlobMerger.CreateDiskBlob(Arena, AddHeader); - Y_DEBUG_ABORT_UNLESS(!inplacedDataSize); - inplacedDataSize = data.GetSize(); - } - - TDiskPart preallocatedLocation; - if (!PushIndexOnly(key, memRec, dataMerger, inplacedDataSize, &preallocatedLocation)) { - return false; - } - - // write inplace data if we have some - if (data) { - Y_DEBUG_ABORT_UNLESS(data.GetSize() == inplacedDataSize); - TDiskPart writtenLocation = DataWriter.Push(data); - Y_DEBUG_ABORT_UNLESS(writtenLocation == preallocatedLocation); - } - - return true; - } - // returns true when done, false means 'continue calling me, I'have more chunks to write' bool FlushNext(ui64 firstLsn, ui64 lastLsn, ui32 maxMsgs) { if (!DataWriter.IsFinished()) { @@ -943,6 +920,7 @@ namespace NKikimr { const ui32 ChunkSize; TRopeArena& Arena; const bool AddHeader; + const TBlobStorageGroupType GType; // pending messages TQueue<std::unique_ptr<NPDisk::TEvChunkWrite>> MsgQueue; diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp index 385996f4735..91121fbaace 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp @@ -171,18 +171,24 @@ namespace NKikimr { memRec.SetDiskBlob(TDiskPart(0, 0, data.size())); merger.Clear(); - merger.SetLoadDataMode(true); merger.AddFromFresh(memRec, &blobBuf, key, step + 1); - merger.Finish(); - - bool pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger()); - if (!pushRes) { + merger.Finish(false); + + auto push = [&] { + TDiskPart preallocatedLocation; + bool pushRes = WriterPtr->PushIndexOnly(key, merger.GetMemRec(), &merger.GetDataMerger(), &preallocatedLocation); + if (pushRes && merger.GetMemRec().GetType() == TBlobType::DiskBlob && merger.GetMemRec().DataSize()) { + const TDiskPart writtenLocation = WriterPtr->PushDataOnly(merger.GetDataMerger().CreateDiskBlob(Arena)); + Y_ABORT_UNLESS(writtenLocation == preallocatedLocation); + } + return pushRes; + }; + if (!push()) { Finish(step); WriterPtr = std::make_unique<TWriterLogoBlob>(TestCtx.GetVCtx(), EWriterDataType::Fresh, ChunksToUse, Owner, OwnerRound, ChunkSize, AppendBlockSize, WriteBlockSize, 0, false, ReservedChunks, Arena, true); - pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger()); - Y_ABORT_UNLESS(pushRes); + Y_ABORT_UNLESS(push()); } while (auto msg = WriterPtr->GetPendingMessage()) { Apply(msg); @@ -212,20 +218,26 @@ namespace NKikimr { memRec2.SetHugeBlob(TDiskPart(1, 2, 3)); merger.Clear(); - merger.SetLoadDataMode(true); merger.AddFromFresh(memRec1, nullptr, key, 1); merger.AddFromFresh(memRec2, nullptr, key, 2); - merger.Finish(); - - bool pushRes = WriterPtr->Push(key, merger.GetMemRec(), merger.GetDataMerger()); - if (!pushRes) { + merger.Finish(false); + + auto push = [&] { + TDiskPart preallocatedLocation; + bool pushRes = WriterPtr->PushIndexOnly(key, merger.GetMemRec(), &merger.GetDataMerger(), &preallocatedLocation); + if (pushRes && merger.GetMemRec().GetType() == TBlobType::DiskBlob && merger.GetMemRec().DataSize()) { + const TDiskPart writtenLocation = WriterPtr->PushDataOnly(merger.GetDataMerger().CreateDiskBlob(Arena)); + Y_ABORT_UNLESS(writtenLocation == preallocatedLocation); + } + return pushRes; + }; + if (!push()) { Finish(step); WriterPtr = std::make_unique<TWriterLogoBlob>(TestCtx.GetVCtx(), EWriterDataType::Fresh, ChunksToUse, Owner, OwnerRound, ChunkSize, AppendBlockSize, WriteBlockSize, 0, false, ReservedChunks, Arena, true); - pushRes = WriterPtr->Push(key, merger.GetMemRec(), merger.GetDataMerger()); - Y_ABORT_UNLESS(pushRes); + Y_ABORT_UNLESS(push()); } while (auto msg = WriterPtr->GetPendingMessage()) { Apply(msg); @@ -243,19 +255,16 @@ namespace NKikimr { TKeyBlock key(34 + gen); TMemRecBlock memRec(gen); merger.Clear(); - merger.SetLoadDataMode(true); merger.AddFromFresh(memRec, nullptr, key, gen + 1); - merger.Finish(); + merger.Finish(false); - bool pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger()); - if (!pushRes) { + if (!WriterPtr->PushIndexOnly(key, memRec, nullptr, nullptr)) { Finish(gen); WriterPtr = std::make_unique<TWriterBlock>(TestCtx.GetVCtx(), EWriterDataType::Fresh, ChunksToUse, Owner, OwnerRound, ChunkSize, AppendBlockSize, WriteBlockSize, 0, false, ReservedChunks, Arena, true); - pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger()); - Y_ABORT_UNLESS(pushRes); + Y_ABORT_UNLESS(WriterPtr->PushIndexOnly(key, merger.GetMemRec(), nullptr, nullptr)); } while (auto msg = WriterPtr->GetPendingMessage()) { Apply(msg); diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp index e7063f4ebf1..feb929a6291 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp @@ -200,7 +200,7 @@ namespace NKikimr { DIV_CLASS("col-md-1") { TAG(TH5) { str << "Fresh"; } const char *comp = Fresh.CompactionInProgress() ? "Compaction In Progress" : "No Compaction"; - H6_CLASS ("text-info") {str << comp << " W:" << FreshCompWritesInFlight;} + H6_CLASS ("text-info") {str << comp << " R:" << FreshCompReadsInFlight << "/W:" << FreshCompWritesInFlight;} } DIV_CLASS("col-md-11") {Fresh.OutputHtml(str);} } @@ -220,6 +220,7 @@ namespace NKikimr { void TLevelIndex<TKey, TMemRec>::OutputProto(NKikimrVDisk::LevelIndexStat *stat) const { NKikimrVDisk::FreshStat *fresh = stat->mutable_fresh(); fresh->set_compaction_writes_in_flight(FreshCompWritesInFlight); + fresh->set_compaction_reads_in_flight(FreshCompReadsInFlight); Fresh.OutputProto(fresh); NKikimrVDisk::SliceStat *slice = stat->mutable_slice(); slice->set_compaction_writes_in_flight(HullCompWritesInFlight); diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h index 2261ffe3fb5..79d5c854d32 100644 --- a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h +++ b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h @@ -183,6 +183,7 @@ namespace NKikimr { ui64 CurEntryPointLsn = ui64(-1); ui64 PrevEntryPointLsn = ui64(-1); + TAtomic FreshCompReadsInFlight = 0; TAtomic FreshCompWritesInFlight = 0; TAtomic HullCompReadsInFlight = 0; TAtomic HullCompWritesInFlight = 0; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp index 1ab46aaff69..8268554c68f 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp @@ -37,23 +37,30 @@ namespace NKikimr { const bool BarrierValidation; TDelayedResponses DelayedResponses; bool AllowGarbageCollection = false; + THugeBlobCtxPtr HugeBlobCtx; + ui32 MinREALHugeBlobInBytes; TFields(TIntrusivePtr<THullDs> hullDs, TIntrusivePtr<TLsnMngr> &&lsnMngr, TPDiskCtxPtr &&pdiskCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, const TActorId skeletonId, bool runHandoff, TActorSystem *as, - bool barrierValidation) + bool barrierValidation, + TActorId hugeKeeperId) : LogoBlobsRunTimeCtx(std::make_shared<TLogoBlobsRunTimeCtx>(lsnMngr, pdiskCtx, - skeletonId, runHandoff, hullDs->LogoBlobs)) + skeletonId, runHandoff, hullDs->LogoBlobs, hugeKeeperId)) , BlocksRunTimeCtx(std::make_shared<TBlocksRunTimeCtx>(lsnMngr, pdiskCtx, - skeletonId, runHandoff, hullDs->Blocks)) + skeletonId, runHandoff, hullDs->Blocks, TActorId())) , BarriersRunTimeCtx(std::make_shared<TBarriersRunTimeCtx>(lsnMngr, pdiskCtx, - skeletonId, runHandoff, hullDs->Barriers)) + skeletonId, runHandoff, hullDs->Barriers, TActorId())) , LsnMngr(std::move(lsnMngr)) , ActorSystem(as) , BarrierValidation(barrierValidation) + , HugeBlobCtx(std::move(hugeBlobCtx)) + , MinREALHugeBlobInBytes(minREALHugeBlobInBytes) {} void CutRecoveryLog(const TActorContext &ctx, std::unique_ptr<NPDisk::TEvCutLog> msg) { @@ -75,14 +82,17 @@ namespace NKikimr { THull::THull( TIntrusivePtr<TLsnMngr> lsnMngr, TPDiskCtxPtr pdiskCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, const TActorId skeletonId, bool runHandoff, THullDbRecovery &&uncond, TActorSystem *as, - bool barrierValidation) + bool barrierValidation, + TActorId hugeKeeperId) : THullDbRecovery(std::move(uncond)) - , Fields(std::make_unique<TFields>(HullDs, std::move(lsnMngr), std::move(pdiskCtx), - skeletonId, runHandoff, as, barrierValidation)) + , Fields(std::make_unique<TFields>(HullDs, std::move(lsnMngr), std::move(pdiskCtx), std::move(hugeBlobCtx), + minREALHugeBlobInBytes, skeletonId, runHandoff, as, barrierValidation, hugeKeeperId)) {} THull::~THull() = default; @@ -119,8 +129,8 @@ namespace NKikimr { activeActors.Insert(logNotifierAid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); Fields->SetLogNotifierActorId(logNotifierAid); // actor for LogoBlobs DB - HullDs->LogoBlobs->LIActor = ctx.RegisterWithSameMailbox(CreateLogoBlobsActor(config, HullDs, hullLogCtx, loggerId, - Fields->LogoBlobsRunTimeCtx, syncLogFirstLsnToKeep)); + HullDs->LogoBlobs->LIActor = ctx.RegisterWithSameMailbox(CreateLogoBlobsActor(config, HullDs, hullLogCtx, + Fields->HugeBlobCtx, Fields->MinREALHugeBlobInBytes, loggerId, Fields->LogoBlobsRunTimeCtx, syncLogFirstLsnToKeep)); activeActors.Insert(HullDs->LogoBlobs->LIActor, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); // actor for Blocks DB HullDs->Blocks->LIActor = ctx.RegisterWithSameMailbox(CreateBlocksActor(config, HullDs, hullLogCtx, loggerId, @@ -206,8 +216,6 @@ namespace NKikimr { ReplayAddLogoBlobCmd(ctx, id, partId, ingress, std::move(buffer), lsn, THullDbRecovery::NORMAL); // run compaction if required - CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); } void THull::AddHugeLogoBlob( @@ -220,8 +228,7 @@ namespace NKikimr { ReplayAddHugeLogoBlobCmd(ctx, id, ingress, diskAddr, lsn, THullDbRecovery::NORMAL); // run compaction if required - CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); + CompactFreshLogoBlobsIfRequired(ctx); } void THull::AddLogoBlob( @@ -234,8 +241,7 @@ namespace NKikimr { ReplayAddLogoBlobCmd(ctx, id, ingress, seg.Point(), THullDbRecovery::NORMAL); // run compaction if required - CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); + CompactFreshLogoBlobsIfRequired(ctx); } void THull::AddBulkSst( @@ -280,7 +286,7 @@ namespace NKikimr { Fields->DelayedResponses.ConfirmLsn(lsn, replySender); // run compaction if required - CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, Fields->BlocksRunTimeCtx, ctx, false, + CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, nullptr, 0, Fields->BlocksRunTimeCtx, ctx, false, Fields->AllowGarbageCollection); } @@ -474,10 +480,9 @@ namespace NKikimr { ReplayAddGCCmd(ctx, record, ingress, seg.Last); // run compaction if required - CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); - CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, Fields->BarriersRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); + CompactFreshLogoBlobsIfRequired(ctx); + CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, nullptr, 0, Fields->BarriersRunTimeCtx, ctx, + false, Fields->AllowGarbageCollection); } void THull::CollectPhantoms( @@ -615,12 +620,11 @@ namespace NKikimr { Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1); // run compaction if required - CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); - CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, Fields->BlocksRunTimeCtx, ctx, false, - Fields->AllowGarbageCollection); - CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, Fields->BarriersRunTimeCtx, ctx, false, + CompactFreshLogoBlobsIfRequired(ctx); + CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, nullptr, 0, Fields->BlocksRunTimeCtx, ctx, false, Fields->AllowGarbageCollection); + CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, nullptr, 0, Fields->BarriersRunTimeCtx, ctx, + false, Fields->AllowGarbageCollection); } // run fresh segment or fresh appendix compaction if required @@ -633,7 +637,7 @@ namespace NKikimr { bool allowGarbageCollection) { // try to start fresh compaction - bool freshSegmentCompaction = CompactFreshSegmentIfRequired<TKey, TMemRec>(hullDs, rtCtx, ctx, false, + bool freshSegmentCompaction = CompactFreshSegmentIfRequired<TKey, TMemRec>(hullDs, nullptr, 0, rtCtx, ctx, false, allowGarbageCollection); if (!freshSegmentCompaction) { // if not, try to start appendix compaction @@ -707,4 +711,14 @@ namespace NKikimr { ctx.Send(HullDs->Barriers->LIActor, new TEvPermitGarbageCollection); } + void THull::ApplyHugeBlobSize(ui32 minREALHugeBlobInBytes, const TActorContext& ctx) { + Fields->MinREALHugeBlobInBytes = minREALHugeBlobInBytes; + ctx.Send(HullDs->LogoBlobs->LIActor, new TEvMinHugeBlobSizeUpdate(minREALHugeBlobInBytes)); + } + + void THull::CompactFreshLogoBlobsIfRequired(const TActorContext& ctx) { + CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->HugeBlobCtx, + Fields->MinREALHugeBlobInBytes, Fields->LogoBlobsRunTimeCtx, ctx, false, Fields->AllowGarbageCollection); + } + } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h index 91bbd872507..9d43370da57 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h @@ -1,6 +1,7 @@ #pragma once #include "defs.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_hulllogctx.h> +#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h> #include <ydb/core/blobstorage/vdisk/hulldb/cache_block/cache_block.h> #include <ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.h> #include <ydb/core/blobstorage/vdisk/hulldb/bulksst_add/hulldb_bulksst_add.h> @@ -60,11 +61,14 @@ namespace NKikimr { THull( TIntrusivePtr<TLsnMngr> lsnMngr, TPDiskCtxPtr pdiskCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, const TActorId skeletonId, bool runHandoff, THullDbRecovery &&uncond, TActorSystem *as, - bool barrierValidation); + bool barrierValidation, + TActorId hugeKeeperId); THull(const THull &) = delete; THull(THull &&) = default; THull &operator =(const THull &) = delete; @@ -210,6 +214,10 @@ namespace NKikimr { } void PermitGarbageCollection(const TActorContext& ctx); + + void ApplyHugeBlobSize(ui32 minREALHugeBlobInBytes, const TActorContext& ctx); + + void CompactFreshLogoBlobsIfRequired(const TActorContext& ctx); }; // FIXME: diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp index ca5c5626a30..1edb7cc955e 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp @@ -77,6 +77,8 @@ namespace NKikimr { template <class TKey, class TMemRec> void CompactFreshSegment( TIntrusivePtr<THullDs> &hullDs, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx, const TActorContext &ctx, bool allowGarbageCollection) @@ -105,8 +107,9 @@ namespace NKikimr { ui64 firstLsn = freshSegment->GetFirstLsn(); ui64 lastLsn = freshSegment->GetLastLsn(); std::unique_ptr<TFreshCompaction> compaction(new TFreshCompaction( - hullCtx, rtCtx, freshSegment, freshSegmentSnap, std::move(barriersSnap), std::move(levelSnap), - mergeElementsApproximation, it, firstLsn, lastLsn, TDuration::Max(), {}, allowGarbageCollection)); + hullCtx, rtCtx, std::move(hugeBlobCtx), minREALHugeBlobInBytes, freshSegment, freshSegmentSnap, + std::move(barriersSnap), std::move(levelSnap), mergeElementsApproximation, it, firstLsn, lastLsn, + TDuration::Max(), {}, allowGarbageCollection)); LOG_INFO(ctx, NKikimrServices::BS_HULLCOMP, VDISKP(hullCtx->VCtx->VDiskLogPrefix, @@ -172,6 +175,8 @@ namespace NKikimr { bool CompactionScheduled = false; TInstant NextCompactionWakeup; bool AllowGarbageCollection = false; + THugeBlobCtxPtr HugeBlobCtx; + ui32 MinREALHugeBlobInBytes; friend class TActorBootstrapped<TThis>; @@ -228,8 +233,8 @@ namespace NKikimr { void ScheduleCompaction(const TActorContext &ctx) { // schedule fresh if required - CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, FullCompactionState.ForceFreshCompaction(RTCtx), - AllowGarbageCollection); + CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, MinREALHugeBlobInBytes, RTCtx, ctx, + FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection); if (!Config->BaseInfo.ReadOnly && !RunLevelCompactionSelector(ctx)) { ScheduleCompactionWakeup(ctx); } @@ -255,10 +260,9 @@ namespace NKikimr { TLevelSliceForwardIterator it(HullDs->HullCtx, vec); it.SeekToFirst(); - std::unique_ptr<TLevelCompaction> compaction(new TLevelCompaction( - HullDs->HullCtx, RTCtx, nullptr, nullptr, std::move(barriersSnap), std::move(levelSnap), - mergeElementsApproximation, it, firstLsn, lastLsn, TDuration::Minutes(2), {}, - AllowGarbageCollection)); + std::unique_ptr<TLevelCompaction> compaction(new TLevelCompaction(HullDs->HullCtx, RTCtx, HugeBlobCtx, + MinREALHugeBlobInBytes, nullptr, nullptr, std::move(barriersSnap), std::move(levelSnap), + mergeElementsApproximation, it, firstLsn, lastLsn, TDuration::Minutes(2), {}, AllowGarbageCollection)); NActors::TActorId actorId = RunInBatchPool(ctx, compaction.release()); ActiveActors.Insert(actorId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); } @@ -417,8 +421,10 @@ namespace NKikimr { // run level committer TDiskPartVec removedHugeBlobs(CompactionTask->ExtractHugeBlobsToDelete()); + TDiskPartVec allocatedHugeBlobs(CompactionTask->GetHugeBlobsAllocated()); auto committer = std::make_unique<TAsyncLevelCommitter>(HullLogCtx, HullDbCommitterCtx, RTCtx->LevelIndex, - ctx.SelfID, std::move(chunksAdded), std::move(deleteChunks), std::move(removedHugeBlobs), wId); + ctx.SelfID, std::move(chunksAdded), std::move(deleteChunks), std::move(removedHugeBlobs), + std::move(allocatedHugeBlobs), wId); TActorId committerID = ctx.RegisterWithSameMailbox(committer.release()); ActiveActors.Insert(committerID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); @@ -493,14 +499,14 @@ namespace NKikimr { // run fresh committer auto committer = std::make_unique<TAsyncFreshCommitter>(HullLogCtx, HullDbCommitterCtx, RTCtx->LevelIndex, ctx.SelfID, std::move(msg->CommitChunks), std::move(msg->ReservedChunks), - std::move(msg->FreedHugeBlobs), dbg.Str(), wId); + std::move(msg->FreedHugeBlobs), std::move(msg->AllocatedHugeBlobs), dbg.Str(), wId); auto aid = ctx.RegisterWithSameMailbox(committer.release()); ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); } else { Y_ABORT_UNLESS(RTCtx->LevelIndex->GetCompState() == TLevelIndexBase::StateCompInProgress); CompactionTask->CompactSsts.CompactionFinished(std::move(msg->SegVec), - std::move(msg->FreedHugeBlobs), msg->Aborted); + std::move(msg->FreedHugeBlobs), std::move(msg->AllocatedHugeBlobs), msg->Aborted); if (msg->Aborted) { // if the compaction was aborted, ensure there was no index change Y_ABORT_UNLESS(CompactionTask->GetSstsToAdd().Empty()); @@ -565,8 +571,8 @@ namespace NKikimr { if (FullCompactionState.Enabled()) { ScheduleCompaction(ctx); } else { - CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, - FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection); + CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, MinREALHugeBlobInBytes, RTCtx, + ctx, FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection); } break; case THullCommitFinished::CommitAdvanceLsn: @@ -589,8 +595,9 @@ namespace NKikimr { const ui64 freeUpToLsn = ev->Get()->FreeUpToLsn; RTCtx->SetFreeUpToLsn(freeUpToLsn); // we check if we need to start fresh compaction, FreeUpToLsn influence our decision - const bool freshCompStarted = CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, - FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection); + const bool freshCompStarted = CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, + MinREALHugeBlobInBytes, RTCtx, ctx, FullCompactionState.ForceFreshCompaction(RTCtx), + AllowGarbageCollection); // just for valid info output to the log bool moveEntryPointStarted = false; if (!freshCompStarted && !AdvanceCommitInProgress) { @@ -648,6 +655,8 @@ namespace NKikimr { case E::FRESH_ONLY: Y_ABORT_UNLESS(FreshOnlyCompactQ.empty() || FreshOnlyCompactQ.back().first <= confirmedLsn); FreshOnlyCompactQ.emplace_back(confirmedLsn, ev); + CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, MinREALHugeBlobInBytes, RTCtx, ctx, + true, AllowGarbageCollection); // ask for forced fresh compaction break; } @@ -676,6 +685,10 @@ namespace NKikimr { AllowGarbageCollection = true; } + void Handle(TEvMinHugeBlobSizeUpdate::TPtr ev, const TActorContext& /*ctx*/) { + MinREALHugeBlobInBytes = ev->Get()->MinREALHugeBlobInBytes; + } + STRICT_STFUNC(StateFunc, HFunc(THullCommitFinished, Handle) HFunc(NPDisk::TEvCutLog, Handle) @@ -688,6 +701,7 @@ namespace NKikimr { HFunc(TEvents::TEvPoisonPill, HandlePoison) CFunc(TEvBlobStorage::EvPermitGarbageCollection, HandlePermitGarbageCollection) HFunc(TEvHugePreCompactResult, Handle) + HFunc(TEvMinHugeBlobSizeUpdate, Handle) ) public: @@ -699,6 +713,8 @@ namespace NKikimr { TIntrusivePtr<TVDiskConfig> config, TIntrusivePtr<THullDs> hullDs, std::shared_ptr<THullLogCtx> hullLogCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TActorId loggerId, std::shared_ptr<TRunTimeCtx> rtCtx, std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) @@ -721,6 +737,8 @@ namespace NKikimr { , CompactionTask(new TCompactionTask) , ActiveActors(RTCtx->LevelIndex->ActorCtx->ActiveActors) , LevelStat(HullDs->HullCtx->VCtx->VDiskCounters) + , HugeBlobCtx(std::move(hugeBlobCtx)) + , MinREALHugeBlobInBytes(minREALHugeBlobInBytes) {} }; @@ -728,12 +746,13 @@ namespace NKikimr { TIntrusivePtr<TVDiskConfig> config, TIntrusivePtr<THullDs> hullDs, std::shared_ptr<THullLogCtx> hullLogCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TActorId loggerId, std::shared_ptr<TLevelIndexRunTimeCtx<TKeyLogoBlob, TMemRecLogoBlob>> rtCtx, std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) { - - return new TLevelIndexActor<TKeyLogoBlob, TMemRecLogoBlob>( - config, hullDs, hullLogCtx, loggerId, rtCtx, syncLogFirstLsnToKeep); + return new TLevelIndexActor<TKeyLogoBlob, TMemRecLogoBlob>(config, hullDs, hullLogCtx, std::move(hugeBlobCtx), + minREALHugeBlobInBytes, loggerId, rtCtx,syncLogFirstLsnToKeep); } NActors::IActor* CreateBlocksActor( @@ -743,9 +762,8 @@ namespace NKikimr { TActorId loggerId, std::shared_ptr<TLevelIndexRunTimeCtx<TKeyBlock, TMemRecBlock>> rtCtx, std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) { - - return new TLevelIndexActor<TKeyBlock, TMemRecBlock>( - config, hullDs, hullLogCtx, loggerId, rtCtx, syncLogFirstLsnToKeep); + return new TLevelIndexActor<TKeyBlock, TMemRecBlock>(config, hullDs, hullLogCtx, nullptr, 0, loggerId, rtCtx, + syncLogFirstLsnToKeep); } NActors::IActor* CreateBarriersActor( @@ -755,8 +773,7 @@ namespace NKikimr { TActorId loggerId, std::shared_ptr<TLevelIndexRunTimeCtx<TKeyBarrier, TMemRecBarrier>> rtCtx, std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) { - - return new TLevelIndexActor<TKeyBarrier, TMemRecBarrier>( - config, hullDs, hullLogCtx, loggerId, rtCtx, syncLogFirstLsnToKeep); + return new TLevelIndexActor<TKeyBarrier, TMemRecBarrier>(config, hullDs, hullLogCtx, nullptr, 0, loggerId, rtCtx, + syncLogFirstLsnToKeep); } } diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h index 4a9b773f4b2..e740a9f47e0 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h @@ -2,6 +2,7 @@ #include "defs.h" #include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all.h> +#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h> namespace NKikimr { @@ -23,6 +24,7 @@ namespace NKikimr { const bool RunHandoff; const TIntrusivePtr<TLevelIndex<TKey, TMemRec>> LevelIndex; ui64 FreeUpToLsn = 0; + const TActorId HugeKeeperId; private: // ActorId of the LogCutterNotifier, that aggregates cut log lsn for the Hull database TActorId LogNotifierActorId; @@ -32,12 +34,14 @@ namespace NKikimr { TPDiskCtxPtr pdiskCtx, const TActorId skeletonId, bool runHandoff, - TIntrusivePtr<TLevelIndex<TKey, TMemRec>> levelIndex) + TIntrusivePtr<TLevelIndex<TKey, TMemRec>> levelIndex, + const TActorId hugeKeeperId) : LsnMngr(std::move(lsnMngr)) , PDiskCtx(std::move(pdiskCtx)) , SkeletonId(skeletonId) , RunHandoff(runHandoff) , LevelIndex(std::move(levelIndex)) + , HugeKeeperId(hugeKeeperId) { Y_ABORT_UNLESS(LsnMngr && PDiskCtx && LevelIndex); } @@ -79,6 +83,8 @@ namespace NKikimr { template <class TKey, class TMemRec> void CompactFreshSegment( TIntrusivePtr<THullDs> &hullDs, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx, const TActorContext &ctx, bool allowGarbageCollection); @@ -86,6 +92,8 @@ namespace NKikimr { template <class TKey, class TMemRec> bool CompactFreshSegmentIfRequired( TIntrusivePtr<THullDs> &hullDs, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx, const TActorContext &ctx, bool force, @@ -94,7 +102,8 @@ namespace NKikimr { ui64 yardFreeUpToLsn = rtCtx->GetFreeUpToLsn(); bool compact = hullDs->HullCtx->FreshCompaction && rtCtx->LevelIndex->NeedsFreshCompaction(yardFreeUpToLsn, force); if (compact) { - CompactFreshSegment<TKey, TMemRec>(hullDs, rtCtx, ctx, allowGarbageCollection); + CompactFreshSegment<TKey, TMemRec>(hullDs, std::move(hugeBlobCtx), minREALHugeBlobInBytes, rtCtx, ctx, + allowGarbageCollection); } return compact; } @@ -106,6 +115,8 @@ namespace NKikimr { TIntrusivePtr<TVDiskConfig> config, TIntrusivePtr<THullDs> hullDs, std::shared_ptr<THullLogCtx> hullLogCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TActorId loggerId, std::shared_ptr<TLevelIndexRunTimeCtx<TKeyLogoBlob, TMemRecLogoBlob>> rtCtx, std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep); diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h index d6ab9b6dabc..0abc2be976e 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h @@ -62,6 +62,7 @@ namespace NKikimr { TVector<ui32> CommitChunks; // chunks to commit within this log entry TVector<ui32> DeleteChunks; // chunks to delete TDiskPartVec RemovedHugeBlobs; // freed huge blobs + TDiskPartVec AllocatedHugeBlobs; TLevelSegmentPtr ReplSst; // pointer to replicated SST ui32 NumRecoveredBlobs; // number of blobs in this SST (valid only for replicated tables) bool DeleteToDecommitted; @@ -70,10 +71,12 @@ namespace NKikimr { THullCommitMeta(TVector<ui32>&& chunksAdded, TVector<ui32>&& chunksDeleted, TDiskPartVec&& removedHugeBlobs, + TDiskPartVec&& allocatedHugeBlobs, bool prevSliceActive) : CommitChunks(std::move(chunksAdded)) , DeleteChunks(std::move(chunksDeleted)) , RemovedHugeBlobs(std::move(removedHugeBlobs)) + , AllocatedHugeBlobs(std::move(allocatedHugeBlobs)) , NumRecoveredBlobs(0) , DeleteToDecommitted(prevSliceActive) {} @@ -140,9 +143,9 @@ namespace NKikimr { // notify delayed deleter when log record is actually written; we MUST ensure that updates are coming in // order of increasing LSN's; this is achieved automatically as all actors reside on the same mailbox LevelIndex->DelayedCompactionDeleterInfo->Update(LsnSeg.Last, std::move(Metadata.RemovedHugeBlobs), - CommitRecord.DeleteToDecommitted ? CommitRecord.DeleteChunks : TVector<TChunkIdx>(), - PDiskSignatureForHullDbKey<TKey>(), WId, ctx, Ctx->HugeKeeperId, Ctx->SkeletonId, Ctx->PDiskCtx, - Ctx->HullCtx->VCtx); + std::move(Metadata.AllocatedHugeBlobs), CommitRecord.DeleteToDecommitted ? CommitRecord.DeleteChunks : + TVector<TChunkIdx>(), PDiskSignatureForHullDbKey<TKey>(), WId, ctx, Ctx->HugeKeeperId, Ctx->SkeletonId, + Ctx->PDiskCtx, Ctx->HullCtx->VCtx); NPDisk::TEvLogResult* msg = ev->Get(); @@ -223,6 +226,7 @@ namespace NKikimr { NKikimrVDiskData::THullDbEntryPoint pb; LevelIndex->SerializeToProto(*pb.MutableLevelIndex()); Metadata.RemovedHugeBlobs.SerializeToProto(*pb.MutableRemovedHugeBlobs()); + Metadata.AllocatedHugeBlobs.SerializeToProto(*pb.MutableAllocatedHugeBlobs()); return THullDbSignatureRoutines::Serialize(pb); } @@ -323,7 +327,7 @@ namespace NKikimr { std::move(levelIndex), notifyID, TActorId(), - {TVector<ui32>(), TVector<ui32>(), TDiskPartVec(), false}, + {TVector<ui32>(), TVector<ui32>(), TDiskPartVec(), TDiskPartVec(), false}, callerInfo, 0) {} @@ -351,6 +355,7 @@ namespace NKikimr { TVector<ui32>&& chunksAdded, TVector<ui32>&& chunksDeleted, TDiskPartVec&& removedHugeBlobs, + TDiskPartVec&& allocatedHugeBlobs, const TString &callerInfo, ui64 wId) : TBase(std::move(hullLogCtx), @@ -358,7 +363,7 @@ namespace NKikimr { std::move(levelIndex), notifyID, TActorId(), - {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), false}, + {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), std::move(allocatedHugeBlobs), false}, callerInfo, wId) {} @@ -383,13 +388,14 @@ namespace NKikimr { TVector<ui32>&& chunksAdded, TVector<ui32>&& chunksDeleted, TDiskPartVec&& removedHugeBlobs, + TDiskPartVec&& allocatedHugeBlobs, ui64 wId) : TBase(std::move(hullLogCtx), std::move(ctx), std::move(levelIndex), notifyID, TActorId(), - {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), true}, + {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), std::move(allocatedHugeBlobs), true}, TString(), wId) {} diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h index c9e7af00039..73ffcdc8c39 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h @@ -30,6 +30,7 @@ namespace NKikimr { TIntrusivePtr<TFreshSegment> FreshSegment; // huge blobs to delete after compaction TDiskPartVec FreedHugeBlobs; + TDiskPartVec AllocatedHugeBlobs; // was the compaction process aborted by some reason? bool Aborted = false; @@ -85,7 +86,8 @@ namespace NKikimr { // messages we have to send to Yard TVector<std::unique_ptr<IEventBase>> MsgsForYard; - TActorId SkeletonId; + const TActorId SkeletonId; + const TActorId HugeKeeperId; bool IsAborting = false; ui32 PendingResponses = 0; @@ -140,13 +142,18 @@ namespace NKikimr { // there are events, we send them to yard; worker internally controls all in flight limits and does not // generate more events than allowed; this function returns boolean status indicating whether compaction job // is finished or not - const bool done = Worker.MainCycle(MsgsForYard); + std::vector<ui32> *slotsToAllocate = nullptr; + const bool done = Worker.MainCycle(MsgsForYard, &slotsToAllocate); // check if there are messages we have for yard for (std::unique_ptr<IEventBase>& msg : MsgsForYard) { ctx.Send(PDiskCtx->PDiskId, msg.release()); ++PendingResponses; } MsgsForYard.clear(); + // send slots to allocate to huge keeper, if any + if (slotsToAllocate) { + ctx.Send(HugeKeeperId, new TEvHugeAllocateSlots(std::move(*slotsToAllocate))); + } // when done, continue with other state if (done) { Finalize(ctx); @@ -228,11 +235,17 @@ namespace NKikimr { MainCycle(ctx); } + void Handle(TEvHugeAllocateSlotsResult::TPtr ev, const TActorContext& ctx) { + Worker.Apply(ev->Get()); + MainCycle(ctx); + } + STRICT_STFUNC(WorkFunc, HFunc(NPDisk::TEvChunkReserveResult, HandleYardResponse) HFunc(NPDisk::TEvChunkWriteResult, HandleYardResponse) HFunc(NPDisk::TEvChunkReadResult, HandleYardResponse) HFunc(TEvRestoreCorruptedBlobResult, Handle) + HFunc(TEvHugeAllocateSlotsResult, Handle) HFunc(TEvents::TEvPoisonPill, HandlePoison) ) ///////////////////////// WORK: END ///////////////////////////////////////////////// @@ -264,6 +277,11 @@ namespace NKikimr { PDiskSignatureForHullDbKey<TKey>().ToString().data(), CompactionID, (FreshSegment ? "true" : "false"), Worker.GetFreedHugeBlobs().ToString().data())); msg->FreedHugeBlobs = IsAborting ? TDiskPartVec() : Worker.GetFreedHugeBlobs(); + msg->AllocatedHugeBlobs = IsAborting ? TDiskPartVec() : Worker.GetAllocatedHugeBlobs(); + + if (IsAborting) { // release previously preallocated slots for huge blobs if we are aborting + ctx.Send(HugeKeeperId, new TEvHugeDropAllocatedSlots(Worker.GetAllocatedHugeBlobs().Vec)); + } // chunks to commit msg->CommitChunks = IsAborting ? TVector<ui32>() : Worker.GetCommitChunks(); @@ -304,6 +322,8 @@ namespace NKikimr { THullCompaction(THullCtxPtr hullCtx, const std::shared_ptr<TLevelIndexRunTimeCtx> &rtCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TIntrusivePtr<TFreshSegment> freshSegment, std::shared_ptr<TFreshSegmentSnapshot> freshSegmentSnap, TBarriersSnapshot &&barriersSnap, @@ -326,10 +346,11 @@ namespace NKikimr { , Hmp(CreateHandoffMap<TKey, TMemRec>(HullCtx, rtCtx->RunHandoff, rtCtx->SkeletonId)) , Gcmp(CreateGcMap<TKey, TMemRec>(HullCtx, mergeElementsApproximation, allowGarbageCollection)) , It(it) - , Worker(HullCtx, PDiskCtx, rtCtx->LevelIndex, it, (bool)FreshSegment, firstLsn, lastLsn, restoreDeadline, - partitionKey) + , Worker(HullCtx, PDiskCtx, std::move(hugeBlobCtx), minREALHugeBlobInBytes, rtCtx->LevelIndex, it, + static_cast<bool>(FreshSegment), firstLsn, lastLsn, restoreDeadline, partitionKey) , CompactionID(TAppData::RandomProvider->GenRand64()) , SkeletonId(rtCtx->SkeletonId) + , HugeKeeperId(rtCtx->HugeKeeperId) {} }; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h index 2c8aed0c3d8..72cc57b617c 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h @@ -15,18 +15,18 @@ namespace NKikimr { ui64 Id; ui32 NumReads; TDiskPart PreallocatedLocation; - TDiskBlobMergerWithMask Merger; - NMatrix::TVectorType PartsToStore; + TDiskBlobMerger Merger; TLogoBlobID BlobId; + bool IsInline; TItem(ui64 id, ui32 numReads, const TDiskPart& preallocatedLocation, const TDiskBlobMerger& merger, - NMatrix::TVectorType partsToStore, const TLogoBlobID& blobId) + const TLogoBlobID& blobId, bool isInline) : Id(id) , NumReads(numReads) , PreallocatedLocation(preallocatedLocation) - , Merger(merger, partsToStore) - , PartsToStore(partsToStore) + , Merger(merger) , BlobId(blobId) + , IsInline(isInline) {} }; @@ -57,12 +57,12 @@ namespace NKikimr { ProcessItemQueue(); } - void AddReadDiskBlob(ui64 id, TRope&& buffer, NMatrix::TVectorType expectedParts) { + void AddReadDiskBlob(ui64 id, TRope&& buffer, ui8 partIdx) { Y_ABORT_UNLESS(Started); Y_ABORT_UNLESS(ItemQueue); TItem& item = ItemQueue.front(); Y_ABORT_UNLESS(item.Id == id); - item.Merger.Add(TDiskBlob(&buffer, expectedParts, GType, item.BlobId)); + item.Merger.AddPart(std::move(buffer), GType, TLogoBlobID(item.BlobId, partIdx + 1)); Y_ABORT_UNLESS(item.NumReads > 0); if (!--item.NumReads) { ProcessItemQueue(); @@ -89,12 +89,9 @@ namespace NKikimr { } void ProcessItem(TItem& item) { - // ensure that we have all the parts we must have - Y_ABORT_UNLESS(item.Merger.GetDiskBlob().GetParts() == item.PartsToStore); - // get newly generated blob raw content and put it into writer queue static_cast<TDerived&>(*this).ProcessItemImpl(item.PreallocatedLocation, item.Merger.CreateDiskBlob(Arena, - AddHeader)); + AddHeader), item.IsInline); } }; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp index aa24409982a..4f278eb1f5a 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp @@ -8,7 +8,7 @@ using namespace NKikimr; -const TBlobStorageGroupType GType(TBlobStorageGroupType::ErasureNone); +const TBlobStorageGroupType GType(TBlobStorageGroupType::Erasure4Plus2Block); std::unordered_map<TString, size_t> StringToId; std::deque<TRope> IdToRope; @@ -29,7 +29,7 @@ class TTestDeferredQueue : public TDeferredItemQueueBase<TTestDeferredQueue> { void StartImpl() { } - void ProcessItemImpl(const TDiskPart& /*preallocatedLocation*/, const TRope& buffer) { + void ProcessItemImpl(const TDiskPart& /*preallocatedLocation*/, const TRope& buffer, bool /*isInline*/) { Results.push_back(GetResMapId(buffer)); } @@ -47,23 +47,16 @@ public: Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) { Y_UNIT_TEST(Basic) { - TRope parts[6] = { - TRope(TString("AAAAAA")), - TRope(TString("BBBBBB")), - TRope(TString("CCCCCC")), - TRope(TString("DDDDDD")), - TRope(TString("EEEEEE")), - TRope(TString("FFFFFF")), - }; - - ui32 fullDataSize = 32; + TString data = "AAABBBCCCDDDEEEFFF"; + TLogoBlobID blobId(0, 0, 0, 0, data.size(), 0); + std::array<TRope, 6> parts; + ErasureSplit(static_cast<TErasureType::ECrcMode>(blobId.CrcMode()), GType, TRope(data), parts); std::unordered_map<TString, size_t> resm; struct TItem { ssize_t BlobId; NMatrix::TVectorType BlobParts; - NMatrix::TVectorType PartsToStore; TVector<std::pair<size_t, NMatrix::TVectorType>> DiskData; size_t Expected; }; @@ -72,17 +65,15 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) { TRopeArena arena(&TRopeArenaBackend::Allocate); auto process = [&](ui32 mem, ui32 masks0, ui32 masks1, ui32 numDiskParts) { - ui32 masks[3] = {masks0, masks1, 0}; + ui32 masks[2] = {masks0, masks1}; TItem item; TDiskBlobMerger expm; // create initial memory merger - for (ui8 i = 0; i < 6; ++i) { + for (ui8 i = 0; i < GType.TotalPartCount(); ++i) { if (mem >> i & 1) { - TRope buf = TDiskBlob::Create(fullDataSize, i + 1, 6, TRope(parts[i]), arena, true); - TDiskBlob blob(&buf, NMatrix::TVectorType::MakeOneHot(i, 6), GType, TLogoBlobID(0, 0, 0, 0, parts[i].GetSize(), 0)); - expm.Add(blob); + expm.AddPart(TRope(parts[i]), GType, TLogoBlobID(blobId, i + 1)); } } @@ -95,47 +86,28 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) { item.BlobParts = expm.GetDiskBlob().GetParts(); // generate disk parts - ui64 wholeMask = mem; for (ui32 i = 0; i < numDiskParts; ++i) { const ui32 mask = masks[i]; Y_ABORT_UNLESS(mask); TDiskBlobMerger m; - for (ui8 i = 0; i < 6; ++i) { + for (ui8 i = 0; i < GType.TotalPartCount(); ++i) { if (mask >> i & 1) { - TRope buf = TDiskBlob::Create(fullDataSize, i + 1, 6, TRope(parts[i]), arena, true); - TDiskBlob blob(&buf, NMatrix::TVectorType::MakeOneHot(i, 6), GType, TLogoBlobID(0, 0, 0, 0, parts[i].GetSize(), 0)); - m.Add(blob); - expm.Add(blob); + m.AddPart(TRope(parts[i]), GType, TLogoBlobID(blobId, i + 1)); + expm.AddPart(TRope(parts[i]), GType, TLogoBlobID(blobId, i + 1)); } } TRope buf = m.CreateDiskBlob(arena, true); Y_ABORT_UNLESS(buf); item.DiskData.emplace_back(GetResMapId(buf), m.GetDiskBlob().GetParts()); - - wholeMask |= mask; } // generate parts to store vector and store item - for (ui32 p = 1; p < 64; ++p) { - if ((p & wholeMask) == p) { - NMatrix::TVectorType v(0, 6); - for (ui8 i = 0; i < 6; ++i) { - if (p >> i & 1) { - v.Set(i); - } - } - item.PartsToStore = v; - - TDiskBlobMergerWithMask mx; - mx.SetFilterMask(v); - mx.Add(expm.GetDiskBlob()); - item.Expected = GetResMapId(mx.CreateDiskBlob(arena, true)); - - items.push_back(item); - } + if (!expm.Empty()) { + item.Expected = GetResMapId(expm.CreateDiskBlob(arena, true)); + items.push_back(item); } }; @@ -173,22 +145,28 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) { // prepare item merger for this sample item TDiskBlobMerger merger; if (item.BlobId != -1) { - merger.Add(TDiskBlob(&IdToRope[item.BlobId], item.BlobParts, GType, TLogoBlobID(0, 0, 0, 0, 6, 0))); + merger.Add(TDiskBlob(&IdToRope[item.BlobId], item.BlobParts, GType, blobId)); } // put the item to queue - q.Put(id, item.DiskData.size(), TDiskPart(0, 0, 0), std::move(merger), item.PartsToStore, TLogoBlobID(0, 0, 0, 0, 6, 0)); - for (auto& p : item.DiskData) { - itemQueue.emplace(id, IdToRope[p.first], p.second); + ui32 numReads = 0; + for (auto& [ropeId, parts] : item.DiskData) { + itemQueue.emplace(id, IdToRope[ropeId], parts); + numReads += parts.CountBits(); } + q.Put(id, numReads, TDiskPart(0, 0, 0), std::move(merger), blobId, true); referenceResults.push_back(item.Expected); ++id; } q.Start(); while (itemQueue) { - auto& front = itemQueue.front(); - q.AddReadDiskBlob(std::get<0>(front), std::move(std::get<1>(front)), std::get<2>(front)); + auto& [id, buffer, parts] = itemQueue.front(); + TDiskBlob blob(&buffer, parts, GType, blobId); + for (ui8 partIdx : parts) { + TRope holder; + q.AddReadDiskBlob(id, TRope(blob.GetPart(partIdx, &holder)), partIdx); + } itemQueue.pop(); UNIT_ASSERT_VALUES_EQUAL(q.AllProcessed(), itemQueue.empty()); } @@ -196,7 +174,7 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) { UNIT_ASSERT_VALUES_EQUAL(referenceResults.size(), q.Results.size()); for (ui32 i = 0; i < referenceResults.size(); ++i) { - UNIT_ASSERT_EQUAL(referenceResults[i], q.Results[i]); + UNIT_ASSERT_VALUES_EQUAL(IdToRope[referenceResults[i]].ConvertToString(), IdToRope[q.Results[i]].ConvertToString()); } } diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h index b2bb2e1eec8..6fa069c7032 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h @@ -7,18 +7,20 @@ #include <ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h> #include <ydb/core/blobstorage/vdisk/hulldb/blobstorage_hullgcmap.h> #include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h> +#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h> namespace NKikimr { template<typename TKey, typename TMemRec, typename TIterator> class THullCompactionWorker { + static constexpr bool LogoBlobs = std::is_same_v<TKey, TKeyLogoBlob>; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // COMMON TYPE ALIASES //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // handoff map using THandoffMap = NKikimr::THandoffMap<TKey, TMemRec>; - using TTransformedItem = typename THandoffMap::TTransformedItem; using THandoffMapPtr = TIntrusivePtr<THandoffMap>; // garbage collector map @@ -41,26 +43,41 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TDeferredItemQueue : public TDeferredItemQueueBase<TDeferredItemQueue> { - TWriter *Writer = nullptr; + THullCompactionWorker *Worker = nullptr; friend class TDeferredItemQueueBase<TDeferredItemQueue>; - void StartImpl(TWriter *writer) { - Y_ABORT_UNLESS(!Writer); - Writer = writer; - Y_ABORT_UNLESS(Writer); + void StartImpl(THullCompactionWorker *worker) { + Y_ABORT_UNLESS(!Worker); + Worker = worker; + Y_ABORT_UNLESS(Worker); + Y_ABORT_UNLESS(Worker->WriterPtr); } - void ProcessItemImpl(const TDiskPart& preallocatedLocation, TRope&& buffer) { - TDiskPart writtenLocation = Writer->PushDataOnly(std::move(buffer)); + void ProcessItemImpl(const TDiskPart& preallocatedLocation, TRope&& buffer, bool isInline) { + Y_DEBUG_ABORT_UNLESS(Worker); - // ensure that item was written into preallocated position - Y_ABORT_UNLESS(writtenLocation == preallocatedLocation); + if (isInline) { + const TDiskPart writtenLocation = Worker->WriterPtr->PushDataOnly(std::move(buffer)); + Y_ABORT_UNLESS(writtenLocation == preallocatedLocation); + } else { + Y_ABORT_UNLESS(preallocatedLocation.Size == buffer.GetSize()); + size_t fullSize = buffer.GetSize(); + if (const size_t misalign = fullSize % Worker->PDiskCtx->Dsk->AppendBlockSize) { + fullSize += Worker->PDiskCtx->Dsk->AppendBlockSize - misalign; + } + auto partsPtr = MakeIntrusive<NPDisk::TEvChunkWrite::TRopeAlignedParts>(std::move(buffer), fullSize); + void *cookie = nullptr; + auto write = std::make_unique<NPDisk::TEvChunkWrite>(Worker->PDiskCtx->Dsk->Owner, + Worker->PDiskCtx->Dsk->OwnerRound, preallocatedLocation.ChunkIdx, preallocatedLocation.Offset, + partsPtr, cookie, true, NPriWrite::HullComp, false); + Worker->PendingWrites.push_back(std::move(write)); + } } void FinishImpl() { - Y_ABORT_UNLESS(Writer); - Writer = nullptr; + Y_ABORT_UNLESS(Worker); + Worker = nullptr; } public: @@ -77,6 +94,7 @@ namespace NKikimr { enum class EState { Invalid, // invalid state; this state should never be reached GetNextItem, // going to extract next item for processing or finish if there are no more items + WaitForSlotAllocation, // waiting for slot allocation from huge keeper TryProcessItem, // trying to write item into SST WaitingForDeferredItems, FlushingSST, // flushing SST to disk @@ -97,6 +115,8 @@ namespace NKikimr { // basic contexts THullCtxPtr HullCtx; TPDiskCtxPtr PDiskCtx; + THugeBlobCtxPtr HugeBlobCtx; + ui32 MinREALHugeBlobInBytes; // Group Type const TBlobStorageGroupType GType; @@ -132,11 +152,12 @@ namespace NKikimr { // record merger for compaction TCompactRecordMerger IndexMerger; - // current handoff-transformed item - const TTransformedItem *TransformedItem = nullptr; + // current handoff-transformed MemRec + std::optional<TMemRec> MemRec; // SST writer std::unique_ptr<TWriter> WriterPtr; + bool WriterHasPendingOperations = false; // number of chunks we have asked to reserve, but not yet confirmed ui32 ChunkReservePending = 0; @@ -154,10 +175,11 @@ namespace NKikimr { ui32 InFlightReads = 0; // maximum number of such requests - ui32 MaxInFlightReads; + ui32 MaxInFlightReads = 0; // vector of freed huge blobs TDiskPartVec FreedHugeBlobs; + TDiskPartVec AllocatedHugeBlobs; // generated level segments TVector<TIntrusivePtr<TLevelSegment>> LevelSegments; @@ -173,14 +195,14 @@ namespace NKikimr { struct TBatcherPayload { ui64 Id = 0; - NMatrix::TVectorType LocalParts; // a bit vector of local parts we are going to read from this disk blob + ui8 PartIdx; TLogoBlobID BlobId; TDiskPart Location; TBatcherPayload() = default; - TBatcherPayload(ui64 id, NMatrix::TVectorType localParts, TLogoBlobID blobId, TDiskPart location) + TBatcherPayload(ui64 id, ui8 partIdx, TLogoBlobID blobId, TDiskPart location) : Id(id) - , LocalParts(localParts) + , PartIdx(partIdx) , BlobId(blobId) , Location(location) {} @@ -193,11 +215,8 @@ namespace NKikimr { TDeferredItemQueue DeferredItems; ui64 NextDeferredItemId = 1; - // previous key (in case of logoblobs) - TKey PreviousKey; - - // is this the first key? - bool IsFirstKey = true; + TKey Key; // current key + std::optional<TKey> PreviousKey; // previous key (nullopt for the first iteration) public: struct TStatistics { @@ -260,13 +279,18 @@ namespace NKikimr { TStatistics Statistics; TDuration RestoreDeadline; + // Partition key is used for splitting resulting SSTs by the PartitionKey if present. Partition key // is used for compaction policy implementation to limit number of intermediate chunks durint compaction. std::optional<TKey> PartitionKey; + std::deque<std::unique_ptr<NPDisk::TEvChunkWrite>> PendingWrites; + public: THullCompactionWorker(THullCtxPtr hullCtx, TPDiskCtxPtr pdiskCtx, + THugeBlobCtxPtr hugeBlobCtx, + ui32 minREALHugeBlobInBytes, TIntrusivePtr<TLevelIndex> levelIndex, const TIterator& it, bool isFresh, @@ -276,6 +300,8 @@ namespace NKikimr { std::optional<TKey> partitionKey) : HullCtx(std::move(hullCtx)) , PDiskCtx(std::move(pdiskCtx)) + , HugeBlobCtx(std::move(hugeBlobCtx)) + , MinREALHugeBlobInBytes(minREALHugeBlobInBytes) , GType(HullCtx->VCtx->Top->GType) , LevelIndex(std::move(levelIndex)) , FirstLsn(firstLsn) @@ -295,8 +321,8 @@ namespace NKikimr { if (IsFresh) { ChunksToUse = HullCtx->HullSstSizeInChunksFresh; MaxInFlightWrites = HullCtx->FreshCompMaxInFlightWrites; - MaxInFlightReads = 0; - ReadsInFlight = nullptr; + MaxInFlightReads = HullCtx->FreshCompMaxInFlightReads; + ReadsInFlight = &LevelIndex->FreshCompReadsInFlight; WritesInFlight = &LevelIndex->FreshCompWritesInFlight; } else { ChunksToUse = HullCtx->HullSstSizeInChunksLevel; @@ -315,7 +341,7 @@ namespace NKikimr { // main cycle function; return true if compaction is finished and compaction actor can proceed to index load; // when there is more work to do, return false; MUST NOT return true unless all pending requests are finished - bool MainCycle(TVector<std::unique_ptr<IEventBase>>& msgsForYard) { + bool MainCycle(TVector<std::unique_ptr<IEventBase>>& msgsForYard, std::vector<ui32> **slotAllocations) { for (;;) { switch (State) { case EState::Invalid: @@ -323,35 +349,28 @@ namespace NKikimr { case EState::GetNextItem: if (It.Valid()) { - const TKey& key = It.GetCurKey(); - if (IsFirstKey) { - IsFirstKey = false; - } else { - Y_ABORT_UNLESS(!key.IsSameAs(PreviousKey), "duplicate keys: %s -> %s", - PreviousKey.ToString().data(), key.ToString().data()); - } - PreviousKey = key; + Key = It.GetCurKey(); + Y_ABORT_UNLESS(!PreviousKey || *PreviousKey < Key, "duplicate keys: %s -> %s", + PreviousKey->ToString().data(), Key.ToString().data()); // iterator is valid and we have one more item to process; instruct merger whether we want // data or not and proceed to TryProcessItem state Y_ABORT_UNLESS(GcmpIt.Valid()); - IndexMerger.SetLoadDataMode(GcmpIt.KeepData()); It.PutToMerger(&IndexMerger); const bool haveToProcessItem = PreprocessItem(); - if (haveToProcessItem) { + if (!haveToProcessItem) { + FinishItem(); + } else if (TDataMerger& dataMerger = IndexMerger.GetDataMerger(); !LogoBlobs || + dataMerger.GetSlotsToAllocate().empty()) { State = EState::TryProcessItem; } else { - FinishItem(); + State = EState::WaitForSlotAllocation; + *slotAllocations = &dataMerger.GetSlotsToAllocate(); + return false; // expect allocated slots to continue compacting } } else if (WriterPtr) { - // start processing deferred items - DeferredItems.Start(WriterPtr.get()); - // start batcher - ReadBatcher.Start(); - // iterator is not valid and we have writer with items (because we don't create writer - // unless there are actual items we want to write); proceed to WaitingForDeferredItems state - State = EState::WaitingForDeferredItems; + StartCollectingDeferredItems(); } else { // iterator is not valid and we have no writer -- so just proceed to WaitForPendingRequests // state and finish @@ -359,18 +378,17 @@ namespace NKikimr { } break; + case EState::WaitForSlotAllocation: + return false; + case EState::TryProcessItem: // ensure we have transformed item - Y_ABORT_UNLESS(TransformedItem); + Y_ABORT_UNLESS(MemRec); // try to process it - ETryProcessItemStatus status; - status = TryProcessItem(); - switch (status) { + switch (TryProcessItem()) { case ETryProcessItemStatus::Success: - // try to send some messages if this is the fresh segment - if (IsFresh) { - ProcessPendingMessages(msgsForYard); - } + // try to send some messages if needed + ProcessPendingMessages(msgsForYard); // finalize item FinishItem(); // continue with next item @@ -387,18 +405,13 @@ namespace NKikimr { return false; case ETryProcessItemStatus::FinishSST: - // start processing deferred items - DeferredItems.Start(WriterPtr.get()); - // start batcher - ReadBatcher.Start(); - // wait - State = EState::WaitingForDeferredItems; + StartCollectingDeferredItems(); break; } break; case EState::WaitingForDeferredItems: - ProcessPendingMessages(msgsForYard); + ProcessPendingMessages(msgsForYard); // issue any messages generated by deferred items queue if (!DeferredItems.AllProcessed()) { return false; } @@ -408,17 +421,20 @@ namespace NKikimr { ReadBatcher.Finish(); break; - case EState::FlushingSST: - // do not continue processing if there are too many writes in flight - if (InFlightWrites >= MaxInFlightWrites) { + case EState::FlushingSST: { + // if MemRec is set, then this state was invoked from the TryProcessItem call + const bool finished = FlushSST(); + State = !finished ? State : MemRec ? EState::TryProcessItem : EState::GetNextItem; + ProcessPendingMessages(msgsForYard); // issue any generated messages + if (finished) { + Y_ABORT_UNLESS(!WriterPtr->GetPendingMessage()); + WriterPtr.reset(); + } else { + Y_ABORT_UNLESS(InFlightWrites == MaxInFlightWrites); return false; } - // try to flush SST - if (FlushSST(msgsForYard)) { - // we return to state from which finalization was invoked - State = TransformedItem ? EState::TryProcessItem : EState::GetNextItem; - } break; + } case EState::WaitForPendingRequests: // wait until all writes succeed @@ -440,6 +456,12 @@ namespace NKikimr { } } + void StartCollectingDeferredItems() { + DeferredItems.Start(this); + ReadBatcher.Start(); + State = EState::WaitingForDeferredItems; + } + TEvRestoreCorruptedBlob *Apply(NPDisk::TEvChunkReadResult *msg, TInstant now) { AtomicDecrement(*ReadsInFlight); Y_ABORT_UNLESS(InFlightReads > 0); @@ -459,13 +481,9 @@ namespace NKikimr { auto& item = msg->Items.front(); switch (item.Status) { case NKikimrProto::OK: { - std::array<TRope, 8> parts; - ui32 numParts = 0; - for (ui32 i = item.Needed.FirstPosition(); i != item.Needed.GetSize(); i = item.Needed.NextPosition(i)) { - parts[numParts++] = std::move(item.Parts[i]); - } - DeferredItems.AddReadDiskBlob(item.Cookie, TDiskBlob::CreateFromDistinctParts(&parts[0], - &parts[numParts], item.Needed, item.BlobId.BlobSize(), Arena, HullCtx->AddHeader), item.Needed); + Y_DEBUG_ABORT_UNLESS(item.Needed.CountBits() == 1); + const ui8 partIdx = item.Needed.FirstPosition(); + DeferredItems.AddReadDiskBlob(item.Cookie, std::move(item.Parts[partIdx]), partIdx); return ProcessReadBatcher(now); } @@ -491,11 +509,12 @@ namespace NKikimr { while (ReadBatcher.GetResultItem(&serial, &payload, &status, &buffer)) { if (status == NKikimrProto::CORRUPTED) { ExpectingBlobRestoration = true; - TEvRestoreCorruptedBlob::TItem item(payload.BlobId, payload.LocalParts, GType, payload.Location, payload.Id); + const auto needed = NMatrix::TVectorType::MakeOneHot(payload.PartIdx, GType.TotalPartCount()); + TEvRestoreCorruptedBlob::TItem item(payload.BlobId, needed, GType, payload.Location, payload.Id); return new TEvRestoreCorruptedBlob(now + RestoreDeadline, {1u, item}, false, true); } else { Y_ABORT_UNLESS(status == NKikimrProto::OK); - DeferredItems.AddReadDiskBlob(payload.Id, TRope(std::move(buffer)), payload.LocalParts); + DeferredItems.AddReadDiskBlob(payload.Id, TRope(std::move(buffer)), payload.PartIdx); } } return nullptr; @@ -517,14 +536,28 @@ namespace NKikimr { AllocatedChunks.insert(AllocatedChunks.end(), msg->ChunkIds.begin(), msg->ChunkIds.end()); } + void Apply(TEvHugeAllocateSlotsResult *msg) { + if constexpr (LogoBlobs) { + Y_DEBUG_ABORT_UNLESS(State == EState::WaitForSlotAllocation); + State = EState::TryProcessItem; + for (const TDiskPart& p : msg->Locations) { // remember newly allocated slots for entrypoint + AllocatedHugeBlobs.PushBack(p); + } + IndexMerger.GetDataMerger().ApplyAllocatedSlots(msg->Locations); + } else { + Y_ABORT("impossible case"); + } + } + const TVector<TIntrusivePtr<TLevelSegment>>& GetLevelSegments() { return LevelSegments; } const TVector<TChunkIdx>& GetCommitChunks() const { return CommitChunks; } const TDiskPartVec& GetFreedHugeBlobs() const { return FreedHugeBlobs; } + const TDiskPartVec& GetAllocatedHugeBlobs() const { return AllocatedHugeBlobs; } const TDeque<TChunkIdx>& GetReservedChunks() const { return ReservedChunks; } const TDeque<TChunkIdx>& GetAllocatedChunks() const { return AllocatedChunks; } private: - void CollectRemovedHugeBlobs(const TVector<TDiskPart> &hugeBlobs) { + void CollectRemovedHugeBlobs(const std::vector<TDiskPart>& hugeBlobs) { for (const TDiskPart& p : hugeBlobs) { if (!p.Empty()) { FreedHugeBlobs.PushBack(p); @@ -535,33 +568,43 @@ namespace NKikimr { // start item processing; this function transforms item using handoff map and adds collected huge blobs, if any // it returns true if we should keep this item; otherwise it returns false bool PreprocessItem() { - const TKey key = It.GetCurKey(); - // finish merging data for this item - IndexMerger.Finish(); + if constexpr (LogoBlobs) { + IndexMerger.Finish(HugeBlobCtx->IsHugeBlob(GType, Key.LogoBlobID(), MinREALHugeBlobInBytes)); + } else { + IndexMerger.Finish(false); + } // reset transformed item and try to create new one if we want to keep this item const bool keepData = GcmpIt.KeepData(); const bool keepItem = GcmpIt.KeepItem(); - TransformedItem = Hmp->Transform(key, &IndexMerger.GetMemRec(), IndexMerger.GetDataMerger(), keepData, keepItem); if (keepItem) { ++(keepData ? Statistics.KeepItemsWithData : Statistics.KeepItemsWOData); } else { ++Statistics.DontKeepItems; } - // collect huge blobs -- we unconditionally delete DeletedData and save SavedData only in case - // when this blob is written -- that is, when TransformedItem is set. - const TDataMerger *dataMerger = TransformedItem ? TransformedItem->DataMerger : IndexMerger.GetDataMerger(); - if (!TransformedItem) { - CollectRemovedHugeBlobs(dataMerger->GetHugeBlobMerger().SavedData()); + TDataMerger& dataMerger = IndexMerger.GetDataMerger(); + if (keepItem) { + Hmp->Transform(Key, MemRec.emplace(IndexMerger.GetMemRec()), dataMerger, keepData); } - CollectRemovedHugeBlobs(dataMerger->GetHugeBlobMerger().DeletedData()); - return TransformedItem != nullptr; + if constexpr (LogoBlobs) { + if (!keepItem) { // we are deleting this item too, so we drop saved huge blobs here + CollectRemovedHugeBlobs(dataMerger.GetSavedHugeBlobs()); + } + CollectRemovedHugeBlobs(dataMerger.GetDeletedHugeBlobs()); + } + + return keepItem; } ETryProcessItemStatus TryProcessItem() { + // if we have PartitionKey, check it is time to split partitions by PartitionKey + if (PartitionKey && PreviousKey && *PreviousKey < *PartitionKey && Key <= *PartitionKey && WriterPtr) { + return ETryProcessItemStatus::FinishSST; + } + // if there is no active writer, create one and start writing if (!WriterPtr) { // ensure we have enough reserved chunks to do operation; or else request for allocation and wait @@ -575,106 +618,73 @@ namespace NKikimr { (ui32)PDiskCtx->Dsk->ChunkSize, PDiskCtx->Dsk->AppendBlockSize, (ui32)PDiskCtx->Dsk->BulkWriteBlockSize, LevelIndex->AllocSstId(), false, ReservedChunks, Arena, HullCtx->AddHeader); + + WriterHasPendingOperations = false; } - // if we have PartitionKey, check it is time to split partitions by PartitionKey - if (PartitionKey && !IsFirstKey && PreviousKey < *PartitionKey && TransformedItem->Key >= *PartitionKey && - !WriterPtr->Empty()) { + // try to push blob to the index + TDataMerger& dataMerger = IndexMerger.GetDataMerger(); + TDiskPart preallocatedLocation; + if (!WriterPtr->PushIndexOnly(Key, *MemRec, LogoBlobs ? &dataMerger : nullptr, &preallocatedLocation)) { return ETryProcessItemStatus::FinishSST; } - // special logic for fresh: we just put this item into data segment as usual, do not preallocate and then - // write data - if (IsFresh) { - const bool itemWritten = WriterPtr->Push(TransformedItem->Key, *TransformedItem->MemRec, - TransformedItem->DataMerger); - if (itemWritten) { - Statistics.ItemAdded(); - return ETryProcessItemStatus::Success; + // count added item + Statistics.ItemAdded(); + + if constexpr (LogoBlobs) { + const TLogoBlobID& blobId = Key.LogoBlobID(); + + auto& collectTask = dataMerger.GetCollectTask(); + if (MemRec->GetType() == TBlobType::DiskBlob && MemRec->DataSize()) { + // ensure preallocated location has correct size + Y_DEBUG_ABORT_UNLESS(preallocatedLocation.ChunkIdx && preallocatedLocation.Size == MemRec->DataSize()); + // producing inline blob with data here + for (const auto& [location, partIdx] : collectTask.Reads) { + ReadBatcher.AddReadItem(location, {NextDeferredItemId, partIdx, blobId, location}); + } + if (!collectTask.Reads.empty() || WriterHasPendingOperations) { // defer this blob + DeferredItems.Put(NextDeferredItemId++, collectTask.Reads.size(), preallocatedLocation, + collectTask.BlobMerger, blobId, true); + WriterHasPendingOperations = true; + } else { // we can and will produce this inline blob now + const TDiskPart writtenLocation = WriterPtr->PushDataOnly(dataMerger.CreateDiskBlob(Arena)); + Y_ABORT_UNLESS(writtenLocation == preallocatedLocation); + } } else { - return ETryProcessItemStatus::FinishSST; + Y_ABORT_UNLESS(collectTask.BlobMerger.Empty()); + Y_ABORT_UNLESS(collectTask.Reads.empty()); } - } - - // calculate inplaced data size: extract TLogoBlobID from the key, calculate total blob size, then reduce - // it to part size using layout information and finally calculate serialized blob size using number of local - // parts stored in this record; if inplacedDataSize is zero, then we do not store any data inside SSTable, - // otherwise we store DiskBlob and have to assemble it at data pass - ui32 inplacedDataSize = 0; - const NMatrix::TVectorType partsToStore = TransformedItem->MemRec->GetLocalParts(GType); - if (TransformedItem->DataMerger->GetType() == TBlobType::DiskBlob && !partsToStore.Empty()) { - inplacedDataSize = TDiskBlob::CalculateBlobSize(GType, TransformedItem->Key.LogoBlobID(), partsToStore, - HullCtx->AddHeader); - } - // try to push item into SST; in case of failure there is not enough space to fit this item - TDiskPart preallocatedLocation; - if (WriterPtr->PushIndexOnly(TransformedItem->Key, *TransformedItem->MemRec, TransformedItem->DataMerger, - inplacedDataSize, &preallocatedLocation)) { - // count added item - Statistics.ItemAdded(); - - // if we do generate some small blob, we have to enqueue it in Deferred Items queue and then possibly - // issue some reads - if (inplacedDataSize != 0) { - ui32 numReads = 0; - auto lambda = [&](const TMemRec& memRec) { - Y_ABORT_UNLESS(memRec.GetType() == TBlobType::DiskBlob && memRec.HasData()); - - // find out where our disk blob resides - TDiskDataExtractor extr; - memRec.GetDiskData(&extr, nullptr); - TDiskPart location = extr.SwearOne(); - - // get its vector of local parts stored in that location - NMatrix::TVectorType parts = memRec.GetLocalParts(GType); - - // enqueue read - ReadBatcher.AddReadItem(location.ChunkIdx, location.Offset, location.Size, - TBatcherPayload(NextDeferredItemId, parts, It.GetCurKey().LogoBlobID(), location)); - - ++numReads; - }; - IndexMerger.ForEachSmallDiskBlob(std::move(lambda)); - - // either we read something or it is already in memory - const TDiskBlobMerger& diskBlobMerger = TransformedItem->DataMerger->GetDiskBlobMerger(); - Y_ABORT_UNLESS(!diskBlobMerger.Empty() || numReads > 0, "Key# %s MemRec# %s LocalParts# %s KeepData# %s", - It.GetCurKey().ToString().data(), - TransformedItem->MemRec->ToString(HullCtx->IngressCache.Get(), nullptr).data(), - TransformedItem->MemRec->GetLocalParts(GType).ToString().data(), - GcmpIt.KeepData() ? "true" : "false"); - - // TODO(alexvru): maybe we should get rid of copying DiskBlobMerger? - DeferredItems.Put(NextDeferredItemId, numReads, preallocatedLocation, diskBlobMerger, partsToStore, - It.GetCurKey().LogoBlobID()); - - // advance deferred item identifier - ++NextDeferredItemId; + for (const auto& [partIdx, from, to] : dataMerger.GetHugeBlobWrites()) { + const auto parts = NMatrix::TVectorType::MakeOneHot(partIdx, GType.TotalPartCount()); + DeferredItems.Put(NextDeferredItemId++, 0, to, TDiskBlob(from, parts, GType, blobId), blobId, false); } - // return success indicating that this item required no further processing - return ETryProcessItemStatus::Success; - } else { - // return failure meaning this item should be restarted after flushing current SST - return ETryProcessItemStatus::FinishSST; + for (const auto& [partIdx, from, to] : dataMerger.GetHugeBlobMoves()) { + ReadBatcher.AddReadItem(from, {NextDeferredItemId, partIdx, blobId, from}); + DeferredItems.Put(NextDeferredItemId++, 1, to, TDiskBlobMerger(), blobId, false); + } } + + // return success indicating that this item required no further processing + return ETryProcessItemStatus::Success; } void FinishItem() { + // adjust previous key + PreviousKey.emplace(Key); // clear merger and on-disk record list and advance both iterators synchronously IndexMerger.Clear(); It.Next(); GcmpIt.Next(); - TransformedItem = nullptr; + MemRec.reset(); } - bool FlushSST(TVector<std::unique_ptr<IEventBase>>& msgsForYard) { + bool FlushSST() { // try to flush some more data; if the flush fails, it means that we have reached in flight write limit and // there is nothing to do here now, so we return - const bool flushDone = WriterPtr->FlushNext(FirstLsn, LastLsn, MaxInFlightWrites - InFlightWrites); - ProcessPendingMessages(msgsForYard); - if (!flushDone) { + if (!WriterPtr->FlushNext(FirstLsn, LastLsn, MaxInFlightWrites - InFlightWrites)) { return false; } @@ -683,20 +693,18 @@ namespace NKikimr { LevelSegments.push_back(conclusion.LevelSegment); CommitChunks.insert(CommitChunks.end(), conclusion.UsedChunks.begin(), conclusion.UsedChunks.end()); - // ensure that all writes were executed and drop writer - Y_ABORT_UNLESS(!WriterPtr->GetPendingMessage()); - WriterPtr.reset(); - return true; } void ProcessPendingMessages(TVector<std::unique_ptr<IEventBase>>& msgsForYard) { // ensure that we have writer Y_ABORT_UNLESS(WriterPtr); + Y_ABORT_UNLESS(MaxInFlightWrites); + Y_ABORT_UNLESS(MaxInFlightReads); // send new messages until we reach in flight limit std::unique_ptr<NPDisk::TEvChunkWrite> msg; - while (InFlightWrites < MaxInFlightWrites && (msg = WriterPtr->GetPendingMessage())) { + while (InFlightWrites < MaxInFlightWrites && (msg = GetPendingWriteMessage())) { HullCtx->VCtx->CountCompactionCost(*msg); Statistics.Update(msg.get()); msgsForYard.push_back(std::move(msg)); @@ -715,6 +723,15 @@ namespace NKikimr { } } + std::unique_ptr<NPDisk::TEvChunkWrite> GetPendingWriteMessage() { + std::unique_ptr<NPDisk::TEvChunkWrite> res = WriterPtr->GetPendingMessage(); + if (!res && !PendingWrites.empty()) { + res = std::move(PendingWrites.front()); + PendingWrites.pop_front(); + } + return res; + } + std::unique_ptr<NPDisk::TEvChunkReserve> CheckForReservation() { if (ReservedChunks.size() + ChunkReservePending >= ChunksToUse) { return nullptr; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h index 879f30e984c..071117234df 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h @@ -2,6 +2,7 @@ #include "defs.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h> +#include <ydb/core/blobstorage/vdisk/common/disk_part.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h> #include <util/generic/queue.h> @@ -115,9 +116,10 @@ namespace NKikimr { // enqueue read item -- a read from specific chunk at desired position and length; function returns serial // number of this request; all results are then reported sequently in ascending order of returned serial - ui64 AddReadItem(TChunkIdx chunkIdx, ui32 offset, ui32 size, TPayload&& payload) { + ui64 AddReadItem(TDiskPart location, TPayload&& payload) { const ui64 serial = NextSerial++; - ReadQueue.push_back(TReadItem{chunkIdx, offset, size, std::move(payload), serial, NKikimrProto::UNKNOWN, {}}); + ReadQueue.push_back(TReadItem{location.ChunkIdx, location.Offset, location.Size, std::move(payload), serial, + NKikimrProto::UNKNOWN, {}}); return serial; } diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp index f23ad8640c4..535d4e6b69e 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp @@ -66,7 +66,7 @@ Y_UNIT_TEST_SUITE(ReadBatcher) { const ui32 offset = rng() % chunkSize; const ui32 len = 1 + rng() % Min<ui32>(100000, chunkSize + 1 - offset); TPayload payload{it->first, offset, len}; - batcher.AddReadItem(it->first, offset, len, std::move(payload)); + batcher.AddReadItem({it->first, offset, len}, std::move(payload)); } batcher.Start(); diff --git a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp index 4b64cb675a4..fc7d8b46790 100644 --- a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp @@ -12,11 +12,9 @@ namespace NKikimr { TReleaseQueueItem& item = ReleaseQueue.front(); if (CurrentSnapshots.empty() || (item.RecordLsn <= CurrentSnapshots.begin()->first)) { // matching record -- commit it to huge hull keeper and throw out of the queue - if (item.WId) { + if (!item.RemovedHugeBlobs.Empty() || !item.AllocatedHugeBlobs.Empty()) { ctx.Send(hugeKeeperId, new TEvHullFreeHugeSlots(std::move(item.RemovedHugeBlobs), - item.RecordLsn, item.Signature, item.WId)); - } else { - Y_ABORT_UNLESS(item.RemovedHugeBlobs.Empty()); + std::move(item.AllocatedHugeBlobs), item.RecordLsn, item.Signature, item.WId)); } if (item.ChunksToForget) { LOG_DEBUG(ctx, NKikimrServices::BS_VDISK_CHUNKS, VDISKP(vctx->VDiskLogPrefix, diff --git a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h index 34e6a6c3447..92bf649012c 100644 --- a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h +++ b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h @@ -55,14 +55,16 @@ namespace NKikimr { struct TReleaseQueueItem { ui64 RecordLsn; TDiskPartVec RemovedHugeBlobs; + TDiskPartVec AllocatedHugeBlobs; TVector<TChunkIdx> ChunksToForget; TLogSignature Signature; ui64 WId; - TReleaseQueueItem(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TVector<TChunkIdx> chunksToForget, - TLogSignature signature, ui64 wId) + TReleaseQueueItem(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TDiskPartVec&& allocatedHugeBlobs, + TVector<TChunkIdx> chunksToForget, TLogSignature signature, ui64 wId) : RecordLsn(recordLsn) , RemovedHugeBlobs(std::move(removedHugeBlobs)) + , AllocatedHugeBlobs(std::move(allocatedHugeBlobs)) , ChunksToForget(std::move(chunksToForget)) , Signature(signature) , WId(wId) @@ -89,17 +91,21 @@ namespace NKikimr { // this function is called every time when compaction is about to commit new entrypoint containing at least // one removed huge blob; recordLsn is allocated LSN of this entrypoint - void Update(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TVector<TChunkIdx> chunksToForget, TLogSignature signature, - ui64 wId, const TActorContext& ctx, const TActorId& hugeKeeperId, const TActorId& skeletonId, - const TPDiskCtxPtr& pdiskCtx, const TVDiskContextPtr& vctx) { + void Update(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TDiskPartVec&& allocatedHugeBlobs, + TVector<TChunkIdx> chunksToForget, TLogSignature signature, ui64 wId, const TActorContext& ctx, + const TActorId& hugeKeeperId, const TActorId& skeletonId, const TPDiskCtxPtr& pdiskCtx, + const TVDiskContextPtr& vctx) { Y_ABORT_UNLESS(recordLsn > LastDeletionLsn); LastDeletionLsn = recordLsn; LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLCOMP, vctx->VDiskLogPrefix - << "TDelayedCompactionDeleter: Update recordLsn# " << recordLsn << " removedHugeBlobs.size# " - << removedHugeBlobs.Size() << " CurrentSnapshots.size# " << CurrentSnapshots.size() + << "TDelayedCompactionDeleter: Update recordLsn# " << recordLsn + << " removedHugeBlobs.size# " << removedHugeBlobs.Size() + << " allocatedHugeBlobs.size# " << allocatedHugeBlobs.Size() + << " CurrentSnapshots.size# " << CurrentSnapshots.size() << " CurrentSnapshots.front# " << (CurrentSnapshots.empty() ? 0 : CurrentSnapshots.begin()->first) << " CurrentSnapshots.back# " << (CurrentSnapshots.empty() ? 0 : (--CurrentSnapshots.end())->first)); - ReleaseQueue.emplace_back(recordLsn, std::move(removedHugeBlobs), std::move(chunksToForget), signature, wId); + ReleaseQueue.emplace_back(recordLsn, std::move(removedHugeBlobs), std::move(allocatedHugeBlobs), + std::move(chunksToForget), signature, wId); ProcessReleaseQueue(ctx, hugeKeeperId, skeletonId, pdiskCtx, vctx); } diff --git a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h index 7f5010430b1..ecfe2175ef6 100644 --- a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h +++ b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h @@ -97,8 +97,7 @@ namespace NKikimr { } ui8 CountBits() const { - unsigned v = Vec; - return ::PopCount(v); + return ::PopCount(Vec); } ui8 Raw() const { @@ -134,8 +133,7 @@ namespace NKikimr { } TVectorType operator ~() const { - ui8 v = ~Vec; - return TVectorType(v, Size); + return TVectorType(~Vec, Size); } bool operator ==(const TVectorType &v) const { @@ -174,6 +172,34 @@ namespace NKikimr { return res; } + class TIterator { + const TVectorType& Parts; + ui8 Index; + + public: + TIterator(const TVectorType& parts, bool end) + : Parts(parts) + , Index(end ? Parts.GetSize() : Parts.FirstPosition()) + {} + + friend bool operator ==(const TIterator& x, const TIterator& y) { + Y_DEBUG_ABORT_UNLESS(&x.Parts == &y.Parts); + return x.Index == y.Index; + } + + ui8 operator *() const { + return Index; + } + + TIterator& operator ++() { + Index = Parts.NextPosition(Index); + return *this; + } + }; + + TIterator begin() const { return {*this, false}; } + TIterator end() const { return {*this, true}; } + private: ui8 Vec; ui8 Size; diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp index 0ff8c5bc8fc..1ffd89f504b 100644 --- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp +++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp @@ -75,7 +75,6 @@ namespace NKikimr { NHuge::TAllocChunkRecoveryLogRec HugeBlobAllocChunkRecoveryLogRec; NHuge::TFreeChunkRecoveryLogRec HugeBlobFreeChunkRecoveryLogRec; NHuge::TPutRecoveryLogRec HugeBlobPutRecoveryLogRec; - TDiskPartVec HugeBlobs; NKikimrVDiskData::TPhantomLogoBlobs PhantomLogoBlobs; void Bootstrap(const TActorContext &ctx) { @@ -700,9 +699,9 @@ namespace NKikimr { if (!good) return EDispatchStatus::Error; - HugeBlobs = TDiskPartVec(pb.GetRemovedHugeBlobs()); ui64 lsn = record.Lsn; - TRlas res = LocRecCtx->RepairedHuge->ApplySlotsDeletion(ctx, lsn, HugeBlobs, dbType); + TRlas res = LocRecCtx->RepairedHuge->ApplySlotsDeletion(ctx, lsn, TDiskPartVec(pb.GetRemovedHugeBlobs()), + TDiskPartVec(pb.GetAllocatedHugeBlobs()), dbType); if (!res.Ok) return EDispatchStatus::Error; diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp index 107936cde0b..cda349b5219 100644 --- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp +++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp @@ -521,6 +521,7 @@ namespace NKikimr { Config->HullSstSizeInChunksLevel, Config->HullCompFreeSpaceThreshold, Config->FreshCompMaxInFlightWrites, + Config->FreshCompMaxInFlightReads, Config->HullCompMaxInFlightWrites, Config->HullCompMaxInFlightReads, Config->HullCompReadBatchEfficiencyThreshold, diff --git a/ydb/core/blobstorage/vdisk/protos/events.proto b/ydb/core/blobstorage/vdisk/protos/events.proto index 78a70268bcd..2e98796ac20 100644 --- a/ydb/core/blobstorage/vdisk/protos/events.proto +++ b/ydb/core/blobstorage/vdisk/protos/events.proto @@ -60,6 +60,7 @@ message FreshSegmentStat { message FreshStat { uint32 compaction_writes_in_flight = 1; + uint32 compaction_reads_in_flight = 5; FreshSegmentStat current = 2; FreshSegmentStat dreg = 3; FreshSegmentStat old = 4; diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp index 7aff723af2c..b53078dc238 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp @@ -13,12 +13,7 @@ namespace NKikimr { // TReadBatcher implementation //////////////////////////////////////////////////////////////////////////// // Traverse data parts for a single key - void TReadBatcher::StartTraverse( - const TLogoBlobID& id, - void *cookie, - ui8 queryPartId, - ui32 queryShift, - ui32 querySize) { + void TReadBatcher::StartTraverse(const TLogoBlobID& id, void *cookie, ui8 queryPartId, ui32 queryShift, ui32 querySize) { Y_DEBUG_ABORT_UNLESS(id.PartId() == 0); Y_DEBUG_ABORT_UNLESS(!Traversing); ClearTmpItems(); @@ -30,24 +25,18 @@ namespace NKikimr { QueryPartId = queryPartId; QueryShift = queryShift; QuerySize = querySize; - FoundDiskItems.clear(); - FoundInMemItems.clear(); } // We have data on disk void TReadBatcher::operator () (const TDiskPart &data, NMatrix::TVectorType parts) { Y_DEBUG_ABORT_UNLESS(Traversing); - FoundDiskItems.emplace_back(data, parts); - } - - void TReadBatcher::ProcessFoundDiskItem(const TDiskPart &data, NMatrix::TVectorType parts) { if (QueryPartId && !parts.Get(QueryPartId - 1)) { return; // we have no requested part here } const auto& gtype = Ctx->VCtx->Top->GType; ui32 blobSize = 0; - for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { + for (ui8 i : parts) { blobSize += gtype.PartSize(TLogoBlobID(CurID, i + 1)); } @@ -58,7 +47,7 @@ namespace NKikimr { Y_ABORT_UNLESS(blobSize == data.Size); } - for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) { + for (ui8 i : parts) { const TLogoBlobID partId(CurID, i + 1); const ui32 partSize = gtype.PartSize(partId); if (QueryPartId == 0 || QueryPartId == i + 1) { @@ -84,10 +73,6 @@ namespace NKikimr { // We have diskBlob in memory void TReadBatcher::operator () (const TDiskBlob &diskBlob) { Y_DEBUG_ABORT_UNLESS(Traversing); - FoundInMemItems.push_back(diskBlob); - } - - void TReadBatcher::ProcessFoundInMemItem(const TDiskBlob &diskBlob) { if (QueryPartId == 0 || diskBlob.GetParts().Get(QueryPartId - 1)) { // put data item iff we gather all parts OR we need a concrete part and parts contain it for (TDiskBlob::TPartIterator it = diskBlob.begin(), e = diskBlob.end(); it != e; ++it) { @@ -116,22 +101,12 @@ namespace NKikimr { Y_DEBUG_ABORT_UNLESS(Traversing); Traversing = false; - // process found items; first, we process disk items; then, we process in-mem items that may possibly - // overwrite disk ones to prevent read IOPS - for (const std::tuple<TDiskPart, NMatrix::TVectorType> &diskItem : FoundDiskItems) { - ProcessFoundDiskItem(std::get<0>(diskItem), std::get<1>(diskItem)); - } - for (const TDiskBlob &diskBlob : FoundInMemItems) { - ProcessFoundInMemItem(diskBlob); - } - // NOTE: we may have parts that are not replicated yet; // we MUST NOT return NO_DATA for them; but when parts are missing due to finished GC, we report NODATA - NMatrix::TVectorType mustHave = ingress.PartsWeMustHaveLocally(Ctx->VCtx->Top.get(), - Ctx->VCtx->ShortSelfVDisk, CurID); - NMatrix::TVectorType actuallyHave = ingress.LocalParts(Ctx->VCtx->Top->GType); - NMatrix::TVectorType missingParts = mustHave - actuallyHave; - for (ui8 i = missingParts.FirstPosition(); i != missingParts.GetSize(); i = missingParts.NextPosition(i)) { + const auto mustHave = ingress.PartsWeMustHaveLocally(Ctx->VCtx->Top.get(), Ctx->VCtx->ShortSelfVDisk, CurID); + const auto actuallyHave = ingress.LocalParts(Ctx->VCtx->Top->GType); + const auto missingParts = mustHave - actuallyHave; + for (ui8 i : missingParts) { // NOT_YET if (QueryPartId == 0 || i + 1 == QueryPartId) { Y_ABORT_UNLESS(TmpItems[i].Empty()); diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.h b/ydb/core/blobstorage/vdisk/query/query_readbatch.h index 90cdb84c8aa..280a1fea3a4 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.h +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.h @@ -307,10 +307,8 @@ namespace NKikimr { // We have data on disk void operator () (const TDiskPart &data, NMatrix::TVectorType parts); - void ProcessFoundDiskItem(const TDiskPart &data, NMatrix::TVectorType parts); // We have diskBlob in memory void operator () (const TDiskBlob &diskBlob); - void ProcessFoundInMemItem(const TDiskBlob &diskBlob); // Finish data traverse for a single key void FinishTraverse(const TIngress &ingress); // Abort traverse without giving out results @@ -349,9 +347,6 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> Result; ui64 PDiskReadBytes = 0; - TStackVec<std::tuple<TDiskPart, NMatrix::TVectorType>, MaxTotalPartCount> FoundDiskItems; - TStackVec<TDiskBlob, MaxTotalPartCount> FoundInMemItems; - NReadBatcher::TGlueRead *AddGlueRead(NReadBatcher::TDataItem *item); void PrepareReadPlan(); TString DiskDataItemsToString() const; diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h index e395596774f..0e1d195761a 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h @@ -53,6 +53,7 @@ namespace NKikimr { , HullDs(std::move(hullDs)) , State(EState::STOPPED) , MinReservedChunksCount(ReplCtx->VDiskCfg->HullSstSizeInChunksFresh) + , Merger(ReplCtx->VCtx->Top->GType, ReplCtx->GetAddHeader()) , Arena(&TRopeArenaBackend::Allocate) {} @@ -148,19 +149,21 @@ namespace NKikimr { Y_ABORT_UNLESS(record.Id > PrevID); Y_ABORT_UNLESS(!record.Id.PartId()); - // generate merged ingress for all locally recovered parts - TIngress ingress = TIngress::CreateFromRepl(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk, - record.Id, record.LocalParts); + TMemRecLogoBlob memRec(TIngress::CreateFromRepl(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk, + record.Id, record.LocalParts)); - // add disk blob to merger (in order to write it to SSTable) - Merger.AddBlob(TDiskBlob(&record.Data, record.LocalParts, ReplCtx->VCtx->Top->GType, record.Id)); + // set merger state to match generated blob + memRec.SetType(TBlobType::DiskBlob); + memRec.SetDiskBlob(TDiskPart(0, 0, record.Data.GetSize())); + Merger.FinishFromBlob(); - // create memory record for disk blob with correct length - TMemRecLogoBlob memRec(ingress); - memRec.SetDiskBlob(TDiskPart(0, 0, Merger.GetDiskBlobRawSize(ReplCtx->GetAddHeader()))); - - const bool success = Writer->Push(record.Id, memRec, &Merger); + TDiskPart preallocatedLocation; + const bool success = Writer->PushIndexOnly(record.Id, memRec, &Merger, &preallocatedLocation); if (success) { + Y_DEBUG_ABORT_UNLESS(Merger.GetCollectTask().Reads.empty()); + const TDiskPart writtenLocation = Writer->PushDataOnly(std::move(record.Data)); + Y_ABORT_UNLESS(writtenLocation == preallocatedLocation); + if (auto msg = Writer->GetPendingMessage()) { IssueWriteCmd(std::move(msg), EOutputState::INTERMEDIATE_CHUNK); } diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp index e4c184ed855..e07f2752f87 100644 --- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp +++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp @@ -39,7 +39,7 @@ TVDiskContextPtr CreateVDiskContext(const TBlobStorageGroupInfo& info) { TIntrusivePtr<THullCtx> CreateHullCtx(const TBlobStorageGroupInfo& info, ui32 chunkSize, ui32 compWorthReadSize) { return MakeIntrusive<THullCtx>(CreateVDiskContext(info), chunkSize, compWorthReadSize, true, true, true, true, 1u, - 1u, 2.0, 10u, 10u, 20u, 0.5, TDuration::Minutes(5), TDuration::Seconds(1), true); + 1u, 2.0, 10u, 10u, 10u, 20u, 0.5, TDuration::Minutes(5), TDuration::Seconds(1), true); } TIntrusivePtr<THullDs> CreateHullDs(const TBlobStorageGroupInfo& info) { diff --git a/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp index 077a8eaea53..ea9f9639cac 100644 --- a/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp @@ -146,7 +146,7 @@ namespace NKikimr { // filter out the read queue -- remove items that are already available or not needed auto remove = [](const TReadCmd& item) { const NMatrix::TVectorType missing = item.Item->Needed - item.Item->GetAvailableParts(); - return (missing & item.Parts).Empty() || item.Location == item.Item->CorruptedPart; + return (missing & item.Parts).Empty() || item.Location.Includes(item.Item->CorruptedPart); }; ReadQ.erase(std::remove_if(ReadQ.begin(), ReadQ.end(), remove), ReadQ.end()); diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 54e62b22f40..0832fbc6234 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -206,6 +206,9 @@ namespace NKikimr { if (Config->RunRepl) { ctx.Send(Db->ReplID, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes)); } + if (Hull) { + Hull->ApplyHugeBlobSize(MinREALHugeBlobInBytes, ctx); + } ctx.Send(*SkeletonFrontIDPtr, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes)); } } @@ -1861,6 +1864,7 @@ namespace NKikimr { if (ev->Get()->Status == NKikimrProto::OK) { ApplyHugeBlobSize(Config->MinHugeBlobInBytes); Y_ABORT_UNLESS(MinREALHugeBlobInBytes); + // handle special case when donor disk starts and finds out that it has been wiped out if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) { // send drop donor cmd to NodeWarden @@ -1939,11 +1943,11 @@ namespace NKikimr { Db->HugeKeeperID); // create Hull - Hull = std::make_shared<THull>(Db->LsnMngr, PDiskCtx, Db->SkeletonID, - Config->BalancingEnableDelete, std::move(*ev->Get()->Uncond), - ctx.ExecutorThread.ActorSystem, Config->BarrierValidation); + Hull = std::make_shared<THull>(Db->LsnMngr, PDiskCtx, HugeBlobCtx, MinREALHugeBlobInBytes, + Db->SkeletonID, Config->BalancingEnableDelete, std::move(*ev->Get()->Uncond), + ctx.ExecutorThread.ActorSystem, Config->BarrierValidation, Db->HugeKeeperID); ActiveActors.Insert(Hull->RunHullServices(Config, HullLogCtx, Db->SyncLogFirstLsnToKeep, - Db->LoggerID, Db->LogCutterID, ctx), ctx, NKikimrServices::BLOBSTORAGE); + Db->LoggerID, Db->LogCutterID, ctx), ctx, NKikimrServices::BLOBSTORAGE); // create VDiskCompactionState VDiskCompactionState = std::make_unique<TVDiskCompactionState>(Hull->GetHullDs()->LogoBlobs->LIActor, diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h index 09cb0d95ac3..70a5528eeb4 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h @@ -34,6 +34,7 @@ namespace NKikimr { // then, scan all the blobs struct TMerger { TRes& Res; + ui64 SstId; ui32 Level; static constexpr bool HaveToMergeData() { return true; } @@ -43,7 +44,7 @@ namespace NKikimr { void Clear() {} void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, const TKeyLogoBlob& key, - ui64 circaLsn) { + ui64 /*circaLsn*/) { TDiskDataExtractor extr; memRec.GetDiskData(&extr, outbound); const TRes::ERecordType recordType = extr.BlobType == TBlobType::DiskBlob @@ -52,7 +53,7 @@ namespace NKikimr { for (const TDiskPart *location = extr.Begin; location != extr.End; ++location) { if (location->ChunkIdx && location->Size) { Res.Layout.push_back({*location, TRes::EDatabase::LogoBlobs, recordType, key.LogoBlobID(), - circaLsn, Level}); + SstId, Level}); } } } @@ -66,7 +67,7 @@ namespace NKikimr { }; auto logoBlobsCallback = [&](const auto& sst, ui32 level) { - TMerger merger{*res, level}; + TMerger merger{*res, sst->AssignedSstId, level}; TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob>::TMemIterator iter(sst.Get()); for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { iter.PutToMerger(&merger); @@ -79,7 +80,7 @@ namespace NKikimr { traverse(Snap.BlocksSnap.SliceSnap, ignoreCallback, TRes::EDatabase::Blocks); traverse(Snap.BarriersSnap.SliceSnap, ignoreCallback, TRes::EDatabase::Barriers); - TMerger merger{*res, Max<ui32>()}; + TMerger merger{*res, 0, Max<ui32>()}; TFreshDataSnapshot<TKeyLogoBlob, TMemRecLogoBlob>::TForwardIterator iter(Snap.HullCtx, &Snap.LogoBlobsSnap.FreshSnap); THeapIterator<TKeyLogoBlob, TMemRecLogoBlob, true> heapIt(&iter); heapIt.Walk(TKeyLogoBlob::First(), &merger, [] (TKeyLogoBlob /*key*/, auto* /*merger*/) { return true; }); diff --git a/ydb/core/protos/blobstorage_vdisk_internal.proto b/ydb/core/protos/blobstorage_vdisk_internal.proto index 69f3b49235c..9fa2877716e 100644 --- a/ydb/core/protos/blobstorage_vdisk_internal.proto +++ b/ydb/core/protos/blobstorage_vdisk_internal.proto @@ -72,6 +72,7 @@ message THullDbEntryPoint { optional TLevelIndex LevelIndex = 1; optional TDiskPartVec RemovedHugeBlobs = 2; + optional TDiskPartVec AllocatedHugeBlobs = 4; // obsolete field repeated TUncommittedRemovedHugeBlob UncommittedRemovedHugeBlobs = 3; |