aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexvru@ydb.tech>2024-11-12 12:12:33 +0300
committerGitHub <noreply@github.com>2024-11-12 09:12:33 +0000
commite62a6425cc6cbabb06b09d03cfc8f9b6ad1cb75c (patch)
treeb46a9a151f482775c051980aa8224de24e47bc68
parent5f2bf90bebedec6c74fc7f66002b35c585f8060e (diff)
downloadydb-e62a6425cc6cbabb06b09d03cfc8f9b6ad1cb75c.tar.gz
Implement intermixed inplace and huge blobs and add test for this case (#10973)
-rw-r--r--ydb/core/base/blobstorage.h3
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/huge.cpp240
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/defs.h1
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/lib/env.h68
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make21
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ya.make1
-rw-r--r--ydb/core/blobstorage/ut_vdisk2/huge.cpp14
-rw-r--r--ydb/core/blobstorage/vdisk/balance/handoff_map.h193
-rw-r--r--ydb/core/blobstorage/vdisk/common/disk_part.h4
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_config.cpp1
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_config.h1
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h9
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h1
-rw-r--r--ydb/core/blobstorage/vdisk/defrag/defrag_search.h23
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp123
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h31
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h163
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp63
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp20
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h1
-rw-r--r--ydb/core/blobstorage/vdisk/huge/ut/ya.make1
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h110
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp47
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h2
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h5
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h4
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h1
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h16
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h358
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h149
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h98
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp49
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h1
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp66
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h10
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp65
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h15
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h18
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h29
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h19
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp78
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h359
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h6
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h22
-rw-r--r--ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h34
-rw-r--r--ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp5
-rw-r--r--ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp1
-rw-r--r--ydb/core/blobstorage/vdisk/protos/events.proto1
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.cpp39
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.h5
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h23
-rw-r--r--ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp12
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h9
-rw-r--r--ydb/core/protos/blobstorage_vdisk_internal.proto1
60 files changed, 1471 insertions, 1189 deletions
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index e7dcbb53a5e..fb48e5185b6 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -738,6 +738,9 @@ struct TEvBlobStorage {
EvHugePreCompactResult,
EvPDiskMetadataLoaded,
EvBalancingSendPartsOnMain,
+ EvHugeAllocateSlots,
+ EvHugeAllocateSlotsResult,
+ EvHugeDropAllocatedSlots,
EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
diff --git a/ydb/core/blobstorage/ut_blobstorage/huge.cpp b/ydb/core/blobstorage/ut_blobstorage/huge.cpp
new file mode 100644
index 00000000000..6ec94afe95c
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/huge.cpp
@@ -0,0 +1,240 @@
+#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
+#include <ydb/core/blobstorage/ut_blobstorage/lib/common.h>
+#include <ydb/core/util/lz4_data_generator.h>
+#include <ydb/core/erasure/erasure.h>
+
+namespace {
+
+ class THugeBlobTest {
+ TEnvironmentSetup Env;
+ TTestActorSystem& Runtime;
+ ui32 DataSize = 32_KB;
+ TString Data = FastGenDataForLZ4(DataSize, 1 /*seed*/);
+ TLogoBlobID BlobId{1000, 1, 1, 0, DataSize, 0};
+ ui32 GroupId;
+ TIntrusivePtr<TBlobStorageGroupInfo> Info;
+ TBlobStorageGroupType GType;
+ TBlobStorageGroupInfo::TVDiskIds VDiskIds;
+ TBlobStorageGroupInfo::TServiceIds ServiceIds;
+ std::vector<TActorId> PutQueueIds;
+ std::vector<TActorId> GetQueueIds;
+ std::vector<TRope> Parts;
+ ui8 TestSubgroupNodeId = 6;
+
+ public:
+ THugeBlobTest()
+ : Env{{
+ .Erasure = TBlobStorageGroupType::Erasure4Plus2Block,
+ .UseFakeConfigDispatcher = true,
+ }}
+ , Runtime(*Env.Runtime)
+ {
+ Env.CreateBoxAndPool(1, 1);
+
+ std::vector<ui32> groups = Env.GetGroups();
+ UNIT_ASSERT(!groups.empty());
+ GroupId = groups.front();
+
+ Info = Env.GetGroupInfo(GroupId);
+ GType = Info->Type;
+
+ Info->PickSubgroup(BlobId.Hash(), &VDiskIds, &ServiceIds);
+
+ for (const TVDiskID& vdiskId : VDiskIds) {
+ PutQueueIds.push_back(Env.CreateQueueActor(vdiskId, NKikimrBlobStorage::PutTabletLog, 0));
+ GetQueueIds.push_back(Env.CreateQueueActor(vdiskId, NKikimrBlobStorage::GetFastRead, 0));
+ }
+
+ Parts.resize(GType.TotalPartCount());
+ const bool success = ErasureSplit(TErasureType::CrcModeNone, GType, TRope(Data), Parts);
+ UNIT_ASSERT(success);
+
+// for (ui32 i = 0; i < 6; ++i) { // put main parts
+// Put(i, i);
+// }
+ }
+
+ void Put(ui8 subgroupNodeId, ui8 partIdx) {
+ Cerr << "writing partIdx# " << (int)partIdx << " to " << (int)subgroupNodeId << Endl;
+ const TActorId queueActorId = PutQueueIds[subgroupNodeId];
+ auto edge = Runtime.AllocateEdgeActor(queueActorId.NodeId(), __FILE__, __LINE__);
+ const TLogoBlobID putId(BlobId, partIdx + 1);
+ Runtime.Send(new IEventHandle(queueActorId, edge, new TEvBlobStorage::TEvVPut(putId, Parts[partIdx],
+ VDiskIds[subgroupNodeId], false, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog)),
+ queueActorId.NodeId());
+ auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVPutResult>(edge);
+ auto& record = res->Get()->Record;
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
+ }
+
+ void SwitchHugeBlobSize(bool small) {
+ Cerr << "small blob# " << small << Endl;
+
+ auto ev = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
+ auto& record = ev->Record;
+ auto& config = *record.MutableConfig();
+ auto& bsConfig = *config.MutableBlobStorageConfig();
+ auto& perf = *bsConfig.MutableVDiskPerformanceSettings();
+ auto& type = *perf.AddVDiskTypes();
+ type.SetPDiskType(NKikimrBlobStorage::EPDiskType::ROT);
+ type.SetMinHugeBlobSizeInBytes(small ? 4096 : 524288);
+
+ const ui32 serviceNodeId = ServiceIds[TestSubgroupNodeId].NodeId();
+ TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__);
+ Runtime.Send(new IEventHandle(NConsole::MakeConfigsDispatcherID(serviceNodeId), edge, ev.release()),
+ serviceNodeId);
+ Env.WaitForEdgeActorEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>(edge);
+ }
+
+ void CompactFresh() {
+ Cerr << "compacting fresh" << Endl;
+
+ const TActorId serviceId = ServiceIds[TestSubgroupNodeId];
+ const ui32 serviceNodeId = serviceId.NodeId();
+ TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__);
+ Runtime.Send(new IEventHandle(serviceId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs,
+ TEvCompactVDisk::EMode::FRESH_ONLY)), serviceNodeId);
+ Env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge);
+ }
+
+ void CompactLevels() {
+ Cerr << "compacting levels" << Endl;
+
+ const TActorId serviceId = ServiceIds[TestSubgroupNodeId];
+ const ui32 serviceNodeId = serviceId.NodeId();
+ TActorId edge = Runtime.AllocateEdgeActor(serviceNodeId, __FILE__, __LINE__);
+ Runtime.Send(new IEventHandle(serviceId, edge, TEvCompactVDisk::Create(EHullDbType::LogoBlobs,
+ TEvCompactVDisk::EMode::FULL)), serviceNodeId);
+ Env.WaitForEdgeActorEvent<TEvCompactVDiskResult>(edge);
+ }
+
+ void PutPartsInMask(ui32 mask) {
+ for (ui32 i = 0; i < GType.TotalPartCount(); ++i) {
+ if (mask & (1 << i)) {
+ Put(TestSubgroupNodeId, i);
+ }
+ }
+ }
+
+ void CheckPartsInPlace(ui32 mask, ui32 numDistinctSST) {
+ Cerr << "checking parts in place mask# " << mask << Endl;
+
+ std::set<ui64> sstIds;
+ const TActorId serviceId = ServiceIds[TestSubgroupNodeId];
+ auto captureRes = Env.SyncQuery<TEvBlobStorage::TEvCaptureVDiskLayoutResult,
+ TEvBlobStorage::TEvCaptureVDiskLayout>(serviceId);
+ for (const auto& item : captureRes->Layout) {
+ using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
+ if (item.Database == T::EDatabase::LogoBlobs) {
+ Cerr << item.ToString() << Endl;
+ if (item.RecordType == T::ERecordType::IndexRecord && item.SstId) {
+ sstIds.insert(item.SstId);
+ }
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(sstIds.size(), numDistinctSST);
+
+ auto getQuery = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(VDiskIds[TestSubgroupNodeId],
+ TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::FastRead, {}, {}, {{BlobId}});
+ const TActorId queueId = GetQueueIds[TestSubgroupNodeId];
+ const ui32 queueNodeId = queueId.NodeId();
+ auto edge = Runtime.AllocateEdgeActor(queueId.NodeId(), __FILE__, __LINE__);
+ Runtime.Send(new IEventHandle(queueId, edge, getQuery.release()), queueNodeId);
+ auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(edge);
+ auto& record = res->Get()->Record;
+
+ UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
+ if (!mask) {
+ UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
+ auto& result = record.GetResult(0);
+ UNIT_ASSERT_VALUES_EQUAL(LogoBlobIDFromLogoBlobID(result.GetBlobID()), BlobId);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
+ } else {
+ for (const auto& result : record.GetResult()) {
+ const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
+ UNIT_ASSERT_VALUES_EQUAL(id.FullID(), BlobId);
+ UNIT_ASSERT(id.PartId());
+ const ui8 partIdx = id.PartId() - 1;
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::OK);
+ UNIT_ASSERT_EQUAL(res->Get()->GetBlobData(result), Parts[partIdx]);
+ UNIT_ASSERT(mask & (1 << partIdx));
+
+ mask &= ~(1 << partIdx);
+ }
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(mask, 0);
+ }
+
+ void RunTest(ui32 fresh1, ui32 fresh2, ui32 huge1, ui32 huge2, bool targetHuge, ui32 fresh3, ui32 huge3,
+ bool targetHuge2, bool targetHuge3) {
+ if (fresh1 || fresh2) {
+ SwitchHugeBlobSize(false);
+ PutPartsInMask(fresh1);
+ PutPartsInMask(fresh2);
+ CheckPartsInPlace(fresh1 | fresh2, 0);
+ }
+ if (huge1 || huge2) {
+ SwitchHugeBlobSize(true);
+ PutPartsInMask(huge1);
+ PutPartsInMask(huge2);
+ CheckPartsInPlace(huge1 | huge2 | fresh1 | fresh2, 0);
+ }
+ const ui32 baseMask = fresh1 | fresh2 | huge1 | huge2;
+ if (baseMask) {
+ SwitchHugeBlobSize(targetHuge);
+ CompactFresh();
+ CheckPartsInPlace(baseMask, 1);
+ }
+ if (const ui32 extraMask = fresh3 | huge3) {
+ if (fresh3) {
+ SwitchHugeBlobSize(false);
+ PutPartsInMask(fresh3);
+ }
+ if (huge3) {
+ SwitchHugeBlobSize(true);
+ PutPartsInMask(huge3);
+ }
+ SwitchHugeBlobSize(targetHuge2);
+ CompactFresh();
+ CheckPartsInPlace(baseMask | extraMask, !!baseMask + !!extraMask);
+
+ SwitchHugeBlobSize(targetHuge3);
+ CompactLevels();
+ CheckPartsInPlace(baseMask | extraMask, 1);
+ }
+ }
+
+ static void CompactionTest() {
+ for (ui32 fresh1 = 0; fresh1 < 8; ++fresh1) {
+ for (ui32 fresh2 = 0; fresh2 < 2; ++fresh2) {
+ for (ui32 huge1 = 0; huge1 < 4; ++huge1) {
+ for (ui32 huge2 = 0; huge2 < 2; ++huge2) {
+ for (bool targetHuge : {true, false}) {
+ for (ui32 fresh3 = 0; fresh3 < 4; ++fresh3) {
+ for (ui32 huge3 = 0; huge3 < 2; ++huge3) {
+ for (bool targetHuge2 : {true, false}) {
+ for (bool targetHuge3 : {true, false}) {
+ Cerr << "fresh1# " << fresh1 << " fresh2# " << fresh2 << " huge1# " << huge1
+ << " huge2# " << huge2 << " targetHuge# " << targetHuge
+ << " fresh3# " << fresh3
+ << " huge3# " << huge3
+ << " targetHuge2# " << targetHuge2
+ << " targetHuge3# " << targetHuge3
+ << Endl;
+ THugeBlobTest test;
+ test.RunTest(fresh1, fresh2, huge1, huge2, targetHuge, fresh3, huge3, targetHuge2, targetHuge3);
+ }}}}}}}}}
+ }
+ };
+
+}
+
+Y_UNIT_TEST_SUITE(HugeBlobOnlineSizeChange) {
+
+ Y_UNIT_TEST(Compaction) {
+ THugeBlobTest::CompactionTest();
+ }
+
+}
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
index 08f759acd99..9937d560d1a 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/defs.h
@@ -28,6 +28,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/rusage.h>
#include <util/random/fast.h>
+#include <util/stream/null.h>
using namespace NActors;
using namespace NKikimr;
diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
index 104fa4986b9..69b783e74db 100644
--- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h
+++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h
@@ -9,6 +9,8 @@
#include <library/cpp/testing/unittest/registar.h>
+static auto& Cconf = Cnull;
+
struct TEnvironmentSetup {
std::unique_ptr<TTestActorSystem> Runtime;
static constexpr ui32 DrivesPerNode = 5;
@@ -82,7 +84,23 @@ struct TEnvironmentSetup {
class TFakeConfigDispatcher : public TActor<TFakeConfigDispatcher> {
std::unordered_set<TActorId> Subscribers;
- TActorId EdgeId;
+
+ struct TConfigDistributionTask {
+ TActorId Sender;
+ ui64 Cookie;
+ std::unique_ptr<IEventBase> Response;
+ THashSet<TActorId> RepliesPending;
+
+ TConfigDistributionTask(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev)
+ : Sender(ev->Sender)
+ , Cookie(ev->Cookie)
+ , Response(std::make_unique<NConsole::TEvConsole::TEvConfigNotificationResponse>(ev->Get()->Record))
+ {}
+ };
+
+ std::map<std::tuple<TActorId, ui64>, std::shared_ptr<TConfigDistributionTask>> TasksPending;
+ ui64 LastCookie = 0;
+
public:
TFakeConfigDispatcher()
: TActor<TFakeConfigDispatcher>(&TFakeConfigDispatcher::StateWork)
@@ -91,10 +109,11 @@ struct TEnvironmentSetup {
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
- hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle);
+ hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle)
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle)
hFunc(NConsole::TEvConsole::TEvConfigNotificationResponse, Handle)
hFunc(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle)
+ hFunc(TEvents::TEvUndelivered, Handle)
}
}
@@ -103,24 +122,61 @@ struct TEnvironmentSetup {
if (items.at(0) != NKikimrConsole::TConfigItem::BlobStorageConfigItem) {
return;
}
+ Cconf << "@ subscribed " << ev->Sender << Endl;
Subscribers.emplace(ev->Sender);
}
void Handle(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
- EdgeId = ev->Sender;
+ auto task = std::make_shared<TConfigDistributionTask>(ev);
+ const ui64 cookie = ++LastCookie;
+
for (auto& id : Subscribers) {
auto update = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>();
update->Record.CopyFrom(ev->Get()->Record);
- Send(id, update.Release());
+ Send(id, update.Release(), IEventHandle::FlagTrackDelivery, cookie);
+ task->RepliesPending.insert(id);
+ const auto [it, inserted] = TasksPending.emplace(std::make_tuple(id, cookie), task);
+ Y_ABORT_UNLESS(inserted);
+ }
+
+ Cconf << "@ created task " << cookie << " for " << Subscribers.size() << " subscribers from " <<
+ ev->Sender << " cookie " << ev->Cookie << Endl;
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr& ev) {
+ Cconf << "@ undelivered " << ev->Sender << " cookie " << ev->Cookie << Endl;
+ const auto nh = TasksPending.extract(std::make_tuple(ev->Sender, ev->Cookie));
+ Y_ABORT_UNLESS(nh);
+ Finish(*nh.mapped(), ev->Sender);
+ }
+
+ void Finish(TConfigDistributionTask& task, TActorId sender) {
+ Cconf << "@ finish from " << sender << " task " << task.Sender << " cookie " << task.Cookie << Endl;
+ const size_t numErased = task.RepliesPending.erase(sender);
+ Y_ABORT_UNLESS(numErased);
+ if (task.RepliesPending.empty()) {
+ Send(task.Sender, task.Response.release(), 0, task.Cookie);
}
}
void Handle(NConsole::TEvConsole::TEvConfigNotificationResponse::TPtr& ev) {
- Forward(ev, EdgeId);
+ Cconf << "@ TEvConfigNotificationResponse from " << ev->Sender << " cookie " << ev->Cookie << Endl;
+ const auto nh = TasksPending.extract(std::make_tuple(ev->Sender, ev->Cookie));
+ Y_ABORT_UNLESS(nh);
+ Finish(*nh.mapped(), ev->Sender);
}
void Handle(NConsole::TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr& ev) {
- Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release());
+ Cconf << "@ TEvRemoveConfigSubscriptionRequest from " << ev->Sender << Endl;
+
+ for (auto it = TasksPending.lower_bound(std::make_tuple(ev->Sender, 0)); it != TasksPending.end() &&
+ std::get<0>(it->first) == ev->Sender; it = TasksPending.erase(it)) {
+ Finish(*it->second, ev->Sender);
+ }
+
+ const size_t numErased = Subscribers.erase(ev->Sender);
+ Y_ABORT_UNLESS(numErased);
+ Send(ev->Sender, MakeHolder<NConsole::TEvConsole::TEvRemoveConfigSubscriptionResponse>().Release());
}
};
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make b/ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make
new file mode 100644
index 00000000000..4df1d9171d6
--- /dev/null
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_huge/ya.make
@@ -0,0 +1,21 @@
+UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage)
+
+FORK_SUBTESTS()
+
+SIZE(MEDIUM)
+
+TIMEOUT(600)
+
+SRCS(
+ huge.cpp
+)
+
+PEERDIR(
+ ydb/core/blobstorage/ut_blobstorage/lib
+)
+
+IF (SANITIZER_TYPE)
+ REQUIREMENTS(ram:32)
+ENDIF()
+
+END()
diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make
index 23a7b2ff36f..ded5f16757b 100644
--- a/ydb/core/blobstorage/ut_blobstorage/ya.make
+++ b/ydb/core/blobstorage/ut_blobstorage/ya.make
@@ -64,6 +64,7 @@ RECURSE_FOR_TESTS(
ut_blob_depot_fat
ut_donor
ut_group_reconfiguration
+ ut_huge
ut_read_only_vdisk
ut_osiris
ut_replication
diff --git a/ydb/core/blobstorage/ut_vdisk2/huge.cpp b/ydb/core/blobstorage/ut_vdisk2/huge.cpp
index 153a2a8e366..a0153f8dd6f 100644
--- a/ydb/core/blobstorage/ut_vdisk2/huge.cpp
+++ b/ydb/core/blobstorage/ut_vdisk2/huge.cpp
@@ -32,7 +32,9 @@ Y_UNIT_TEST_SUITE(VDiskTest) {
UNIT_ASSERT_VALUES_EQUAL(res.ResultSize(), 1);
const auto& value = res.GetResult(0);
UNIT_ASSERT_VALUES_EQUAL(value.GetStatus(), NKikimrProto::OK);
- UNIT_ASSERT_VALUES_EQUAL(value.GetBufferData(), *datap);
+ UNIT_ASSERT_EQUAL_C(value.GetBufferData(), *datap, "id# " << id
+ << " got# " << (int)value.GetBufferData().front()
+ << " expected# " << (int)datap->front());
}
};
@@ -61,17 +63,19 @@ Y_UNIT_TEST_SUITE(VDiskTest) {
const ui64 tabletId = tabletIds[RandomNumber(tabletIds.size())];
TTabletContext& tablet = tablets[tabletId];
- TString& data = blobValues[RandomNumber(blobValues.size())];
+ size_t blobValueIndex = RandomNumber(blobValues.size());
+ TString& data = blobValues[blobValueIndex];
TLogoBlobID id(tabletId, tablet.Gen, tablet.Step++, channel, data.size(), 0, 1);
auto res = env->Put(id, data);
UNIT_ASSERT_VALUES_EQUAL(res.GetStatus(), NKikimrProto::OK);
- Cerr << "Put id# " << id << " totalSize# " << totalSize << Endl;
+ Cerr << "Put id# " << id << " totalSize# " << totalSize << " blobValueIndex# " << blobValueIndex << Endl;
- content.emplace(id, &data);
+ const auto [it, inserted] = content.emplace(id, &data);
+ UNIT_ASSERT(inserted);
totalSize += data.size();
- if (RandomNumber(1000u) < 3) {
+ if (RandomNumber(1000u) < 100) {
ui32 minHugeBlobValue;
do {
minHugeBlobValue = minHugeBlobValues[RandomNumber(minHugeBlobValues.size())];
diff --git a/ydb/core/blobstorage/vdisk/balance/handoff_map.h b/ydb/core/blobstorage/vdisk/balance/handoff_map.h
index ddb8f2b5103..3dd5cb47862 100644
--- a/ydb/core/blobstorage/vdisk/balance/handoff_map.h
+++ b/ydb/core/blobstorage/vdisk/balance/handoff_map.h
@@ -37,23 +37,6 @@ namespace NKikimr {
}
};
- //////////////// Transformed Item //////////////////////////////////////
- struct TTransformedItem {
- TKey Key;
- const TMemRec *MemRec;
- const TDataMerger *DataMerger;
-
- // intermediate data
- TMemRec NewMemRec; // new mem rec is build here if required
- TDataMerger NewDataMerger; // new DataMerger, if we need to rebuild it
-
- TTransformedItem();
- const TTransformedItem *SetRaw(const TKey &key, const TMemRec *memRec, const TDataMerger *dataMerger);
- const TTransformedItem *SetNewDisk(const TKey &key, const TIngress &ingress, TDataMerger &dataMerger);
- const TTransformedItem *SetRmData(const TKey &key, const TMemRec *memRec, const TDataMerger *dataMerger);
- };
-
-
THandoffMap(const THullCtxPtr &hullCtx,
bool runHandoff,
const TActorId &skeletonId)
@@ -61,10 +44,6 @@ namespace NKikimr {
, Top(HullCtx->VCtx->Top)
, RunHandoff(runHandoff)
, SkeletonId(skeletonId)
- , DelMap()
- , Counter(0)
- , TrRes()
- , Stat()
{
}
@@ -80,15 +59,10 @@ namespace NKikimr {
// Transforms record according to the built handoff map. It returns item we need to write, pointer is
// valid until next call. Nullptr indicates that item is to be removed completely.
- const TTransformedItem *Transform(const TKey& key, const TMemRec* memRec,
- const TDataMerger* dataMerger, bool /*keepData*/, bool keepItem) {
+ void Transform(const TKey& /*key*/, TMemRec& /*memRec*/, TDataMerger& dataMerger, bool /*keepData*/) {
// do nothing by default, all work is done in template specialization for logo blobs
Counter++;
- if (!keepItem) {
- return nullptr;
- }
- Y_DEBUG_ABORT_UNLESS(dataMerger->Empty());
- return TrRes.SetRaw(key, memRec, dataMerger);
+ Y_DEBUG_ABORT_UNLESS(dataMerger.Empty());
}
private:
@@ -97,166 +71,61 @@ namespace NKikimr {
const bool RunHandoff;
const TActorId SkeletonId;
- TDeque<ui8> DelMap;
- unsigned Counter;
- TTransformedItem TrRes;
+ std::vector<ui8> DelMap;
+ size_t Counter = 0;
TStat Stat;
};
-
- ////////////////////////////////////////////////////////////////////////////
- // TTransformedItem implementation
- ////////////////////////////////////////////////////////////////////////////
- template <class TKey, class TMemRec>
- THandoffMap<TKey, TMemRec>::TTransformedItem::TTransformedItem()
- : Key()
- , MemRec(nullptr)
- , DataMerger(nullptr)
- , NewMemRec()
- , NewDataMerger()
- {}
-
- template <class TKey, class TMemRec>
- const typename THandoffMap<TKey, TMemRec>::TTransformedItem *
- THandoffMap<TKey, TMemRec>::TTransformedItem::SetRaw(const TKey &key, const TMemRec *memRec,
- const TDataMerger *dataMerger) {
- Key = key;
- MemRec = memRec;
- DataMerger = dataMerger;
- NewMemRec.SetNoBlob();
- NewDataMerger.Clear();
- return this;
- }
-
- template <class TKey, class TMemRec>
- const typename THandoffMap<TKey, TMemRec>::TTransformedItem *
- THandoffMap<TKey, TMemRec>::TTransformedItem::SetNewDisk(const TKey &key, const TIngress &ingress,
- TDataMerger &dataMerger) {
- NewMemRec = TMemRecLogoBlob(ingress);
- NewDataMerger.Swap(dataMerger);
- NewMemRec.SetType(NewDataMerger.GetType());
-
- Key = key;
- MemRec = &NewMemRec;
- DataMerger = &NewDataMerger;
- return this;
- }
-
- template <class TKey, class TMemRec>
- const typename THandoffMap<TKey, TMemRec>::TTransformedItem *
- THandoffMap<TKey, TMemRec>::TTransformedItem::SetRmData(const TKey &key, const TMemRec *memRec,
- const TDataMerger *dataMerger) {
- NewMemRec = TMemRecLogoBlob(memRec->GetIngress());
- NewDataMerger.SetEmptyFromAnotherMerger(dataMerger);
- NewMemRec.SetType(NewDataMerger.GetType());
-
- Key = key;
- MemRec = &NewMemRec;
- DataMerger = &NewDataMerger;
- return this;
- }
-
-
- ////////////////////////////////////////////////////////////////////////////
- // Template specialization for LogoBlobs
- ////////////////////////////////////////////////////////////////////////////
- template <>
- inline const THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::TTransformedItem *
- THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::Transform(
- const TKeyLogoBlob& key,
- const TMemRecLogoBlob* memRec,
- const TDataMerger* dataMerger,
- bool keepData,
- bool keepItem)
- {
+ template<>
+ inline void THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::Transform(const TKeyLogoBlob& key, TMemRecLogoBlob& memRec,
+ TDataMerger& dataMerger, bool keepData) {
Y_DEFER { Counter++; };
- if (!keepItem) {
- return nullptr;
- }
-
if (!keepData) {
- return TrRes.SetRmData(key, memRec, dataMerger);
+ memRec.SetNoBlob();
+ memRec.ClearLocalParts(Top->GType);
+ dataMerger.MakeEmpty(); // collect all huge blobs in the merger
+ return;
}
- const TTransformedItem *defaultResult = TrRes.SetRaw(key, memRec, dataMerger); // unchanged by default
if (!RunHandoff) {
- return defaultResult;
+ return;
}
Y_VERIFY(Counter < DelMap.size());
- TIngress ingress = memRec->GetIngress(); // ingress we are going to change
- NMatrix::TVectorType localParts = ingress.LocalParts(Top->GType);
+ TIngress ingress = memRec.GetIngress(); // ingress we are going to change
ui8 vecSize = Top->GType.TotalPartCount();
- const NMatrix::TVectorType delPlan(DelMap.at(Counter), vecSize);
- const NMatrix::TVectorType delVec = delPlan & localParts;
+ const NMatrix::TVectorType delPlan(DelMap[Counter], vecSize);
+ const NMatrix::TVectorType delVec = delPlan & ingress.LocalParts(Top->GType);
if (delVec.Empty()) {
- return defaultResult;
+ return;
}
// mark deleted handoff parts in ingress
- for (ui8 i = localParts.FirstPosition(); i != localParts.GetSize(); i = localParts.NextPosition(i)) {
- if (delVec.Get(i)) {
- const ui8 partId = i + 1;
- TLogoBlobID id(key.LogoBlobID(), partId);
- ingress.DeleteHandoff(Top.get(), HullCtx->VCtx->ShortSelfVDisk, id, true);
- localParts.Clear(i);
- }
+ for (ui8 i : delVec) {
+ // this clears local bit too
+ ingress.DeleteHandoff(Top.get(), HullCtx->VCtx->ShortSelfVDisk, TLogoBlobID(key.LogoBlobID(), i + 1), true);
}
- if (localParts.Empty()) {
- // we have deleted all parts, we can remove the record completely
- return nullptr;
- }
+ // update merger with the filtered parts (only remaining local parts are kept)
+ TBlobType::EType type;
+ ui32 inplacedDataSize = 0;
+ dataMerger.FilterLocalParts(ingress.LocalParts(Top->GType), key.LogoBlobID(), &type, &inplacedDataSize);
- TDataMerger newMerger;
- switch (memRec->GetType()) {
- case TBlobType::DiskBlob: {
- const TDiskBlob& blob = dataMerger->GetDiskBlobMerger().GetDiskBlob();
-
- // create new blob from kept parts
- for (auto it = blob.begin(); it != blob.end(); ++it) {
- if (localParts.Get(it.GetPartId() - 1)) {
- newMerger.AddPart(blob, it);
- }
- }
- break;
- }
- case TBlobType::HugeBlob:
- case TBlobType::ManyHugeBlobs: {
- const auto& oldMerger = dataMerger->GetHugeBlobMerger();
-
- auto parts = oldMerger.GetParts();
- Y_DEBUG_ABORT_UNLESS(oldMerger.SavedData().size() == parts.CountBits());
- Y_DEBUG_ABORT_UNLESS(oldMerger.SavedData().size() == oldMerger.GetCircaLsns().size());
-
- for (ui8 i = parts.FirstPosition(), j = 0; i != parts.GetSize(); i = parts.NextPosition(i), ++j) {
- if (localParts.Get(i)) {
- auto curOneHotPart = NMatrix::TVectorType::MakeOneHot(i, parts.GetSize());
- newMerger.AddHugeBlob(&oldMerger.SavedData()[j], &oldMerger.SavedData()[j] + 1, curOneHotPart, oldMerger.GetCircaLsns()[j]);
- } else {
- newMerger.AddDeletedHugeBlob(oldMerger.SavedData()[j]);
- }
- }
- break;
- }
- default: {
- Y_DEBUG_ABORT_UNLESS(false);
- return defaultResult;
- }
+ // reinstate memRec
+ memRec = TMemRecLogoBlob(ingress);
+ memRec.SetType(type);
+ if (type == TBlobType::DiskBlob && inplacedDataSize) { // update inplaced data size for inplace blob
+ memRec.SetDiskBlob(TDiskPart(0, 0, inplacedDataSize));
}
- // SetNewDisk can handle empty dataMerger correctly.
- // If dataMerger is empty, we still keep the record, it contains knowledge about
- // this logoblob, only garbage collection removes records completely
- return TrRes.SetNewDisk(key, ingress, newMerger);
+ Y_ABORT_UNLESS(memRec.GetLocalParts(Top->GType) == dataMerger.GetParts());
}
-
- template <>
- template <class TIterator>
+ template<>
+ template<class TIterator>
inline void THandoffMap<TKeyLogoBlob, TMemRecLogoBlob>::BuildMap(
const TLevelIndexSnapshot& levelSnap,
const TIterator& i)
diff --git a/ydb/core/blobstorage/vdisk/common/disk_part.h b/ydb/core/blobstorage/vdisk/common/disk_part.h
index 25faf131bcf..f06c23c7ec1 100644
--- a/ydb/core/blobstorage/vdisk/common/disk_part.h
+++ b/ydb/core/blobstorage/vdisk/common/disk_part.h
@@ -258,3 +258,7 @@ struct THash<NKikimr::TDiskPart> {
}
};
+template<>
+inline void Out<NKikimr::TDiskPart>(IOutputStream& s, const NKikimr::TDiskPart& x) {
+ s << x.ToString();
+}
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
index d9fa9ee0144..540edfaac51 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
@@ -39,6 +39,7 @@ namespace NKikimr {
HullCompLevelRateThreshold = 1.0;
HullCompFreeSpaceThreshold = 2.0;
FreshCompMaxInFlightWrites = 10;
+ FreshCompMaxInFlightReads = 10; // when moving huge blobs
HullCompMaxInFlightWrites = 10;
HullCompMaxInFlightReads = 20;
HullCompReadBatchEfficiencyThreshold = 0.5; // don't issue reads if there are more gaps than the useful data
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
index 3632857e29d..dd80ff86cef 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
@@ -128,6 +128,7 @@ namespace NKikimr {
double HullCompLevelRateThreshold;
double HullCompFreeSpaceThreshold;
ui32 FreshCompMaxInFlightWrites;
+ ui32 FreshCompMaxInFlightReads;
ui32 HullCompMaxInFlightWrites;
ui32 HullCompMaxInFlightReads;
double HullCompReadBatchEfficiencyThreshold;
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 9a6962b9db3..b99db787e0b 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -3073,6 +3073,15 @@ namespace NKikimr {
TLogoBlobID BlobId; // for HugeBlob/InplaceBlob
ui64 SstId; // for IndexRecord
ui32 Level; // for IndexRecord
+
+ TString ToString() const {
+ return TStringBuilder() << "{Location# " << Location.ToString()
+ << " Database# " << (int)Database
+ << " RecordType# " << (int)RecordType
+ << " BlobId# " << BlobId.ToString()
+ << " SstId# " << SstId
+ << " Level# " << Level << '}';
+ }
};
std::vector<TLayoutRecord> Layout;
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
index 1c08c0be927..5a26e1e16ff 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
@@ -64,7 +64,6 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
class THugeBlobCtx {
public:
- // this value is multiply of AppendBlockSize and is calculated from Config->MinHugeBlobSize
const std::shared_ptr<const THugeSlotsMap> HugeSlotsMap;
const bool AddHeader;
diff --git a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h
index baf4290bd9c..be569ee9868 100644
--- a/ydb/core/blobstorage/vdisk/defrag/defrag_search.h
+++ b/ydb/core/blobstorage/vdisk/defrag/defrag_search.h
@@ -78,6 +78,7 @@ namespace NKikimr {
, Barriers(FullSnap.BarriersSnap.CreateEssence(FullSnap.HullCtx))
, AllowKeepFlags(FullSnap.HullCtx->AllowKeepFlags)
, Iter(FullSnap.HullCtx, &FullSnap.LogoBlobsSnap)
+ , Merger(GType, false /* addHeader doesn't really matter here */)
{
Iter.SeekToFirst();
}
@@ -115,26 +116,22 @@ namespace NKikimr {
}
void Finish() {
+ TBlobType::EType type;
+ ui32 inplacedDataSize;
+ Merger.Finish(true, Key.LogoBlobID(), &type, &inplacedDataSize);
if (!Merger.Empty()) {
- Y_ABORT_UNLESS(!Merger.HasSmallBlobs());
NGc::TKeepStatus status = Barriers->Keep(Key, MemRec, {}, AllowKeepFlags, true /*allowGarbageCollection*/);
- const auto& hugeMerger = Merger.GetHugeBlobMerger();
- const auto& local = MemRec.GetIngress().LocalParts(GType);
- ui8 partIdx = local.FirstPosition();
- for (const TDiskPart& part : hugeMerger.SavedData()) {
- Y_ABORT_UNLESS(partIdx != local.GetSize());
- if (part.ChunkIdx) {
+ for (const TDiskPart& part : Merger.GetSavedHugeBlobs()) {
+ if (!part.Empty()) {
static_cast<TDerived&>(*this).Add(part, Key.LogoBlobID(), status.KeepData);
}
- partIdx = local.NextPosition(partIdx);
}
- for (const TDiskPart& part : hugeMerger.DeletedData()) {
- if (part.ChunkIdx) {
- static_cast<TDerived&>(*this).Add(part, Key.LogoBlobID(), false);
- }
+ for (const TDiskPart& part : Merger.GetDeletedHugeBlobs()) {
+ Y_ABORT_UNLESS(!part.Empty());
+ static_cast<TDerived&>(*this).Add(part, Key.LogoBlobID(), false);
}
- Merger.Clear();
}
+ Merger.Clear();
}
void Update(const TMemRecLogoBlob &memRec, const TDiskPart *outbound, ui64 lsn) {
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
index 4337654c227..8e406994529 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
@@ -285,7 +285,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
////////////////////////////////////////////////////////////////////////////
class THullHugeBlobChunkAllocator : public TActorBootstrapped<THullHugeBlobChunkAllocator> {
std::shared_ptr<THugeKeeperCtx> HugeKeeperCtx;
- const TActorId NotifyID;
+ TActorId ParentId;
ui64 Lsn;
std::shared_ptr<THullHugeKeeperPersState> Pers;
ui32 ChunkId = 0;
@@ -294,7 +294,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
friend class TActorBootstrapped<THullHugeBlobChunkAllocator>;
- void Bootstrap(const TActorContext &ctx) {
+ void Bootstrap(TActorId parentId, const TActorContext &ctx) {
+ ParentId = parentId;
// reserve chunk
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "ChunkAllocator: bootstrap"));
@@ -348,7 +349,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "ChunkAllocator: committed:"
" chunkId# %" PRIu32 " LsnSeg# %" PRIu64, ChunkId, Lsn));
- ctx.Send(NotifyID, new TEvHullHugeChunkAllocated(ChunkId, SlotSize));
+ ctx.Send(ParentId, new TEvHullHugeChunkAllocated(ChunkId, SlotSize));
Die(ctx);
Span.EndOk();
}
@@ -371,10 +372,9 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
return NKikimrServices::TActivity::BS_HULL_HUGE_BLOB_CHUNKALLOC;
}
- THullHugeBlobChunkAllocator(std::shared_ptr<THugeKeeperCtx> hugeKeeperCtx, const TActorId &notifyID,
+ THullHugeBlobChunkAllocator(std::shared_ptr<THugeKeeperCtx> hugeKeeperCtx,
std::shared_ptr<THullHugeKeeperPersState> pers, NWilson::TTraceId traceId, ui32 slotSize)
: HugeKeeperCtx(std::move(hugeKeeperCtx))
- , NotifyID(notifyID)
, Pers(std::move(pers))
, Span(TWilson::VDiskTopLevel, std::move(traceId), "VDisk.HullHugeBlobChunkAllocator")
, SlotSize(slotSize)
@@ -705,8 +705,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
return true;
} else if (AllocatingChunkPerSlotSize.insert(slotSize).second) {
LWTRACK(HugeBlobChunkAllocatorStart, ev->Get()->Orbit);
- auto aid = ctx.RegisterWithSameMailbox(new THullHugeBlobChunkAllocator(HugeKeeperCtx, ctx.SelfID,
- State.Pers, std::move(traceId), slotSize));
+ auto aid = ctx.RegisterWithSameMailbox(new THullHugeBlobChunkAllocator(HugeKeeperCtx, State.Pers,
+ std::move(traceId), slotSize));
ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
}
if (putToQueue) {
@@ -871,14 +871,14 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
const size_t numErased = AllocatingChunkPerSlotSize.erase(msg->SlotSize);
Y_ABORT_UNLESS(numErased == 1);
ActiveActors.Erase(ev->Sender);
+ ProcessAllocateSlotTasks(msg->SlotSize, ctx);
ProcessQueue(msg->SlotSize, ctx);
}
void Handle(TEvHullFreeHugeSlots::TPtr &ev, const TActorContext &ctx) {
TEvHullFreeHugeSlots *msg = ev->Get();
- Y_ABORT_UNLESS(!msg->HugeBlobs.Empty());
- if (CheckPendingWrite(msg->WId, ev, msg->DeletionLsn, "FreeHugeSlots")) {
+ if (msg->WId && CheckPendingWrite(msg->WId, ev, msg->DeletionLsn, "FreeHugeSlots")) {
return;
}
@@ -887,6 +887,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
"THullHugeKeeper: TEvHullFreeHugeSlots: %s", msg->ToString().data()));
THashSet<ui32> slotSizes;
+
for (const auto &x : msg->HugeBlobs) {
slotSizes.insert(State.Pers->Heap->SlotSizeOfThisSize(x.Size));
NHuge::TFreeRes freeRes = State.Pers->Heap->Free(x);
@@ -898,6 +899,11 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
++State.ItemsAfterCommit;
}
+ for (const TDiskPart& x : msg->AllocatedBlobs) {
+ const bool deleted = State.Pers->DeleteSlotInFlight(State.Pers->Heap->ConvertDiskPartToHugeSlot(x));
+ Y_ABORT_UNLESS(deleted);
+ }
+
auto checkAndSet = [this, msg] (ui64 &dbLsn) {
ui64 origRecoveredLsn = HugeKeeperCtx->LsnMngr->GetOriginallyRecoveredLsn();
Y_VERIFY_S(dbLsn <= msg->DeletionLsn, HugeKeeperCtx->VCtx->VDiskLogPrefix << " Check failed:"
@@ -1058,6 +1064,103 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
HugeKeeperCtx->VCtx->GetHugeHeapFragmentation().Set(stat.CurrentlyUsedChunks, stat.CanBeFreedChunks);
}
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TAllocateSlotsTask {
+ TActorId Sender;
+ ui64 Cookie;
+ std::vector<ui32> BlobSizes;
+ std::vector<TDiskPart> Result;
+ THashMap<ui32, TDynBitMap> Pending;
+
+ TAllocateSlotsTask(TEvHugeAllocateSlots::TPtr& ev)
+ : Sender(ev->Sender)
+ , Cookie(ev->Cookie)
+ , BlobSizes(std::move(ev->Get()->BlobSizes))
+ , Result(BlobSizes.size())
+ {}
+ };
+
+ std::set<std::tuple<ui32, std::shared_ptr<TAllocateSlotsTask>>> SlotSizeToTask;
+
+ void TryToFulfillTask(const std::shared_ptr<TAllocateSlotsTask>& task, ui32 slotSizeToProcess, const TActorContext& ctx) {
+ bool done = true;
+
+ auto processItem = [&](size_t index, TDynBitMap *pending) {
+ auto& result = task->Result[index];
+ Y_DEBUG_ABORT_UNLESS(result.Empty());
+
+ NHuge::THugeSlot hugeSlot;
+ ui32 slotSize;
+ if (State.Pers->Heap->Allocate(task->BlobSizes[index], &hugeSlot, &slotSize)) {
+ State.Pers->AddSlotInFlight(hugeSlot);
+ State.Pers->AddChunkSize(hugeSlot);
+ result = hugeSlot.GetDiskPart();
+ if (pending) {
+ Y_DEBUG_ABORT_UNLESS(pending->Get(index));
+ pending->Reset(index);
+ }
+ } else {
+ if (AllocatingChunkPerSlotSize.insert(slotSize).second) {
+ auto aid = ctx.RegisterWithSameMailbox(new THullHugeBlobChunkAllocator(HugeKeeperCtx,
+ State.Pers, {}, slotSize));
+ ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
+ }
+ done = false;
+ SlotSizeToTask.emplace(slotSize, task);
+ if (pending) {
+ Y_DEBUG_ABORT_UNLESS(pending->Get(index));
+ return false;
+ }
+ task->Pending[slotSize].Set(index);
+ }
+ return true;
+ };
+
+ if (slotSizeToProcess) {
+ const auto it = task->Pending.find(slotSizeToProcess);
+ Y_ABORT_UNLESS(it != task->Pending.end());
+ auto& pending = it->second;
+ Y_FOR_EACH_BIT(index, pending) {
+ if (!processItem(index, &pending)) {
+ break;
+ }
+ }
+ } else {
+ for (size_t i = 0; i < task->BlobSizes.size(); ++i) {
+ processItem(i, nullptr);
+ }
+ }
+
+ if (done) {
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLHUGE, HugeKeeperCtx->VCtx->VDiskLogPrefix
+ << "TEvHugeAllocateSlotsResult# " << FormatList(task->Result));
+ Send(task->Sender, new TEvHugeAllocateSlotsResult(std::move(task->Result)), 0, task->Cookie);
+ }
+ }
+
+ void ProcessAllocateSlotTasks(ui32 slotSize, const TActorContext& ctx) {
+ auto it = SlotSizeToTask.lower_bound(std::make_tuple(slotSize, nullptr));
+ while (it != SlotSizeToTask.end() && std::get<0>(*it) == slotSize) {
+ auto node = SlotSizeToTask.extract(it++);
+ TryToFulfillTask(std::get<1>(node.value()), slotSize, ctx);
+ }
+ }
+
+ void Handle(TEvHugeAllocateSlots::TPtr ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLHUGE, HugeKeeperCtx->VCtx->VDiskLogPrefix
+ << "TEvHugeAllocateSlots# " << FormatList(ev->Get()->BlobSizes));
+ TryToFulfillTask(std::make_shared<TAllocateSlotsTask>(ev), 0, ctx);
+ }
+
+ void Handle(TEvHugeDropAllocatedSlots::TPtr ev, const TActorContext& /*ctx*/) {
+ for (const auto& p : ev->Get()->Locations) {
+ State.Pers->Heap->Free(p);
+ const bool deleted = State.Pers->DeleteSlotInFlight(State.Pers->Heap->ConvertDiskPartToHugeSlot(p));
+ Y_ABORT_UNLESS(deleted);
+ }
+ }
+
//////////// Event Handlers ////////////////////////////////////
STFUNC(StateFunc) {
@@ -1070,6 +1173,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
HFunc(TEvHullHugeCommitted, Handle)
HFunc(TEvHullHugeWritten, Handle)
HFunc(TEvHullHugeBlobLogged, Handle)
+ HFunc(TEvHugeAllocateSlots, Handle)
+ HFunc(TEvHugeDropAllocatedSlots, Handle)
HFunc(TEvHugePreCompact, Handle)
HFunc(TEvHugeLockChunks, Handle)
HFunc(TEvHugeUnlockChunks, Handle)
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
index e3a09814f16..aaa1ede6d3e 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
@@ -131,12 +131,15 @@ namespace NKikimr {
class TEvHullFreeHugeSlots : public TEventLocal<TEvHullFreeHugeSlots, TEvBlobStorage::EvHullFreeHugeSlots> {
public:
const TDiskPartVec HugeBlobs;
+ const TDiskPartVec AllocatedBlobs;
const ui64 DeletionLsn;
const TLogSignature Signature; // identifies database we send update for
const ui64 WId;
- TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn, TLogSignature signature, ui64 wId)
+ TEvHullFreeHugeSlots(TDiskPartVec&& hugeBlobs, TDiskPartVec&& allocatedBlobs, ui64 deletionLsn,
+ TLogSignature signature, ui64 wId)
: HugeBlobs(std::move(hugeBlobs))
+ , AllocatedBlobs(std::move(allocatedBlobs))
, DeletionLsn(deletionLsn)
, Signature(signature)
, WId(wId)
@@ -145,7 +148,7 @@ namespace NKikimr {
TString ToString() const {
TStringStream str;
str << "{" << Signature.ToString() << " DelLsn# " << DeletionLsn << " Slots# " << HugeBlobs.ToString()
- << " WId# " << WId << "}";
+ << " Allocated# " << AllocatedBlobs.ToString() << " WId# " << WId << "}";
return str.Str();
}
};
@@ -216,6 +219,30 @@ namespace NKikimr {
TEvHugePreCompactResult(ui64 wId) : WId(wId) {}
};
+ struct TEvHugeAllocateSlots : TEventLocal<TEvHugeAllocateSlots, TEvBlobStorage::EvHugeAllocateSlots> {
+ std::vector<ui32> BlobSizes;
+
+ TEvHugeAllocateSlots(std::vector<ui32> blobSizes)
+ : BlobSizes(std::move(blobSizes))
+ {}
+ };
+
+ struct TEvHugeAllocateSlotsResult : TEventLocal<TEvHugeAllocateSlotsResult, TEvBlobStorage::EvHugeAllocateSlotsResult> {
+ std::vector<TDiskPart> Locations;
+
+ TEvHugeAllocateSlotsResult(std::vector<TDiskPart> locations)
+ : Locations(std::move(locations))
+ {}
+ };
+
+ struct TEvHugeDropAllocatedSlots : TEventLocal<TEvHugeDropAllocatedSlots, TEvBlobStorage::EvHugeDropAllocatedSlots> {
+ std::vector<TDiskPart> Locations;
+
+ TEvHugeDropAllocatedSlots(std::vector<TDiskPart> locations)
+ : Locations(std::move(locations))
+ {}
+ };
+
////////////////////////////////////////////////////////////////////////////
// THugeKeeperCtx
////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h
index ef5ce75f88b..b1beef6571f 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h
@@ -207,169 +207,6 @@ namespace NKikimr {
TString ToString() const;
};
-
- ////////////////////////////////////////////////////////////////////////////
- // TBlobMerger
- ////////////////////////////////////////////////////////////////////////////
- class TBlobMerger {
- using TCircaLsns = TVector<ui64>;
-
- public:
- TBlobMerger() = default;
-
- void Clear() {
- DiskPtrs.clear();
- Deleted.clear();
- Parts.Clear();
- CircaLsns.clear();
- }
-
- void SetEmptyFromAnotherMerger(const TBlobMerger *fromMerger) {
- Clear();
- Deleted.insert(Deleted.end(), fromMerger->SavedData().begin(), fromMerger->SavedData().end());
- Deleted.insert(Deleted.end(), fromMerger->DeletedData().begin(), fromMerger->DeletedData().end());
- }
-
- void Add(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType &parts, ui64 circaLsn) {
- if (DiskPtrs.empty()) {
- Parts = parts;
- DiskPtrs = {begin, end};
- CircaLsns = TCircaLsns(end - begin, circaLsn);
- Y_ABORT_UNLESS(DiskPtrs.size() == Parts.CountBits());
- } else {
- Merge(begin, end, parts, circaLsn);
- }
- }
-
- void AddDeletedPart(const TDiskPart &part) {
- Deleted.push_back(part);
- }
-
- void AddMetadataParts(NMatrix::TVectorType parts) {
- // this is special case for mirror3of4, where data can have empty TDiskPart
- std::array<TDiskPart, 8> zero;
- zero.fill(TDiskPart());
- // empty TDiskPart at a position means that every circaLsn shoud work
- const ui64 circaLsn = Max<ui64>();
- Merge(zero.begin(), zero.begin() + parts.CountBits(), parts, circaLsn);
- }
-
- bool Empty() const {
- return Parts.Empty();
- }
-
- void Swap(TBlobMerger &m) {
- DiskPtrs.swap(m.DiskPtrs);
- Deleted.swap(m.Deleted);
- Parts.Swap(m.Parts);
- CircaLsns.swap(m.CircaLsns);
- }
-
- TBlobType::EType GetBlobType() const {
- Y_ABORT_UNLESS(!Empty());
- return DiskPtrs.size() == 1 ? TBlobType::HugeBlob : TBlobType::ManyHugeBlobs;
- }
-
- ui32 GetNumParts() const {
- return DiskPtrs.size();
- }
-
- void Output(IOutputStream &str) const {
- if (Empty()) {
- str << "empty";
- } else {
- str << "{Parts# " << Parts.ToString();
- str << " CircaLsns# " << FormatList(CircaLsns);
- str << " DiskPtrs# ";
- FormatList(str, DiskPtrs);
- str << " Deleted# ";
- FormatList(str, Deleted);
- str << "}";
- }
- }
-
- TString ToString() const {
- TStringStream str;
- Output(str);
- return str.Str();
- }
-
- const NMatrix::TVectorType& GetParts() const {
- return Parts;
- }
-
- const TVector<TDiskPart> &SavedData() const {
- return DiskPtrs;
- }
-
- const TVector<TDiskPart> &DeletedData() const {
- return Deleted;
- }
-
- const TCircaLsns &GetCircaLsns() const {
- return CircaLsns;
- }
-
- private:
- typedef TVector<TDiskPart> TDiskPtrs;
-
- TDiskPtrs DiskPtrs;
- TDiskPtrs Deleted;
- NMatrix::TVectorType Parts;
- TCircaLsns CircaLsns;
-
- void Merge(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType &parts, ui64 circaLsn)
- {
- Y_ABORT_UNLESS(end - begin == parts.CountBits());
- Y_DEBUG_ABORT_UNLESS(Parts.GetSize() == parts.GetSize());
- const ui8 maxSize = parts.GetSize();
- TDiskPtrs newDiskPtrs;
- TCircaLsns newCircaLsns;
- newDiskPtrs.reserve(maxSize);
- newCircaLsns.reserve(maxSize);
-
- TDiskPtrs::const_iterator locBegin = DiskPtrs.begin();
- TDiskPtrs::const_iterator locEnd = DiskPtrs.end();
- TDiskPtrs::const_iterator locIt = locBegin;
- const TDiskPart *it = begin;
-
- for (ui8 i = 0; i < maxSize; i++) {
- if (Parts.Get(i) && parts.Get(i)) {
- // both
- Y_DEBUG_ABORT_UNLESS(locIt != locEnd && it != end);
- if (CircaLsns[locIt - locBegin] < circaLsn) {
- // incoming value wins
- newDiskPtrs.push_back(*it);
- newCircaLsns.push_back(circaLsn);
- Deleted.push_back(*locIt);
- } else {
- // already seen value wins
- newDiskPtrs.push_back(*locIt);
- newCircaLsns.push_back(CircaLsns[locIt - locBegin]);
- Deleted.push_back(*it);
- }
- ++locIt;
- ++it;
- } else if (Parts.Get(i)) {
- Y_DEBUG_ABORT_UNLESS(locIt != locEnd);
- newDiskPtrs.push_back(*locIt);
- newCircaLsns.push_back(CircaLsns[locIt - locBegin]);
- ++locIt;
- } else if (parts.Get(i)) {
- Y_DEBUG_ABORT_UNLESS(it != end);
- newDiskPtrs.push_back(*it);
- newCircaLsns.push_back(circaLsn);
- ++it;
- }
- }
-
- Parts |= parts;
- DiskPtrs.swap(newDiskPtrs);
- CircaLsns.swap(newCircaLsns);
- Y_ABORT_UNLESS(DiskPtrs.size() == Parts.CountBits());
- }
- };
-
} // NHuge
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp
deleted file mode 100644
index 9212c2fc301..00000000000
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs_ut.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-#include "blobstorage_hullhugedefs.h"
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/stream/null.h>
-
-#define STR Cnull
-
-
-namespace NKikimr {
-
- using namespace NHuge;
-
- Y_UNIT_TEST_SUITE(TBlobStorageHullHugeBlobMerger) {
-
- Y_UNIT_TEST(Basic) {
- TBlobMerger merger;
-
- const ui32 blobSize = 128u << 10u;
- NMatrix::TVectorType parts1(0, 6);
- parts1.Set(1);
- parts1.Set(3);
- TDiskPart diskParts1[2] = {{1, blobSize * 2u, blobSize}, {2, blobSize * 4u, blobSize}};
-
- NMatrix::TVectorType parts2(0, 6);
- parts2.Set(2);
- TDiskPart diskParts2[1] = {{3, blobSize * 6u, blobSize}};
-
- NMatrix::TVectorType parts3(0, 6);
- parts3.Set(2);
- parts3.Set(3);
- TDiskPart diskParts3[2] = {{4, blobSize * 8u, blobSize}, {5, blobSize * 10u, blobSize}};
-
-
- merger.Add(diskParts1, diskParts1 + 2, parts1, 4); // circaLsn = 4
- STR << merger.ToString() << "\n";
- TVector<TDiskPart> res1 = {{1, blobSize * 2u, blobSize}, {2, blobSize * 4u, blobSize}};
- TVector<ui64> sstIds1{4, 4};
- UNIT_ASSERT(merger.GetCircaLsns() == sstIds1);
- UNIT_ASSERT(merger.SavedData() == res1);
- UNIT_ASSERT(merger.DeletedData().empty());
-
- merger.Add(diskParts2, diskParts2 + 1, parts2, 6); // circaLsn = 6
- STR << merger.ToString() << "\n";
- TVector<TDiskPart> res2 = {{1, blobSize * 2u, blobSize}, {3, blobSize * 6u, blobSize},
- {2, blobSize * 4u, blobSize}};
- TVector<ui64> sstIds2{4, 6, 4};
- UNIT_ASSERT(merger.GetCircaLsns() == sstIds2);
- UNIT_ASSERT(merger.SavedData() == res2);
- UNIT_ASSERT(merger.DeletedData().empty());
-
- merger.Add(diskParts3, diskParts3 + 2, parts3, 5); // circaLsn = 5
- STR << merger.ToString() << "\n";
- TVector<TDiskPart> res3 = {{1, blobSize * 2u, blobSize}, {3, blobSize * 6u, blobSize},
- {5, blobSize * 10u, blobSize}};
- TVector<TDiskPart> deleted3 = {{4, blobSize * 8u, blobSize}, {2, blobSize * 4u, blobSize}};
- TVector<ui64> sstIds3{4, 6, 5};
- UNIT_ASSERT(merger.GetCircaLsns() == sstIds3);
- UNIT_ASSERT(merger.SavedData() == res3);
- UNIT_ASSERT(merger.DeletedData() == deleted3);
- }
- }
-
-} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp
index f874b1c5d58..898271ef6d4 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp
@@ -428,6 +428,7 @@ namespace NKikimr {
const TActorContext &ctx,
ui64 lsn,
const TDiskPartVec &rec,
+ const TDiskPartVec& allocated,
ESlotDelDbType type)
{
ui64 *logPosDelLsn = nullptr;
@@ -446,25 +447,24 @@ namespace NKikimr {
}
if (lsn > *logPosDelLsn) {
// apply
- LOG_DEBUG(ctx, BS_HULLHUGE,
- VDISKP(VCtx->VDiskLogPrefix,
- "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 " entryLsn# %" PRIu64 "): "
- "RmHugeBlobs apply: %s",
- Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data()));
+ LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, "Recovery(guid# %" PRIu64 " lsn# %" PRIu64
+ " entryLsn# %" PRIu64 "): " "RmHugeBlobs apply: %s", Guid, lsn, LogPos.EntryPointLsn,
+ rec.ToString().data()));
for (const auto &x : rec) {
Heap->RecoveryModeFree(x);
}
+ for (const auto& x : allocated) {
+ Heap->RecoveryModeAllocate(x);
+ }
*logPosDelLsn = lsn;
PersistentLsn = Min(PersistentLsn, lsn);
return TRlas(true, false);
} else {
// skip
- LOG_DEBUG(ctx, BS_HULLHUGE,
- VDISKP(VCtx->VDiskLogPrefix,
- "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 " entryLsn# %" PRIu64 "): "
- "RmHugeBlobs skip: %s",
- Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data()));
+ LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, "Recovery(guid# %" PRIu64 " lsn# %" PRIu64
+ " entryLsn# %" PRIu64 "): " "RmHugeBlobs skip: %s", Guid, lsn, LogPos.EntryPointLsn,
+ rec.ToString().data()));
return TRlas(true, true);
}
}
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h
index f7b04ba7b42..ffac5bdb0ca 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h
@@ -175,6 +175,7 @@ namespace NKikimr {
TRlas ApplySlotsDeletion(const TActorContext &ctx,
ui64 lsn,
const TDiskPartVec &rec,
+ const TDiskPartVec& allocated,
ESlotDelDbType type);
TRlas Apply(const TActorContext &ctx,
ui64 lsn,
diff --git a/ydb/core/blobstorage/vdisk/huge/ut/ya.make b/ydb/core/blobstorage/vdisk/huge/ut/ya.make
index ef3c4f7755a..f5a1d4b2d9c 100644
--- a/ydb/core/blobstorage/vdisk/huge/ut/ya.make
+++ b/ydb/core/blobstorage/vdisk/huge/ut/ya.make
@@ -16,7 +16,6 @@ PEERDIR(
)
SRCS(
- blobstorage_hullhugedefs_ut.cpp
blobstorage_hullhugeheap_ctx_ut.cpp
blobstorage_hullhugeheap_ut.cpp
blobstorage_hullhuge_ut.cpp
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h
index d849b4b9b41..c4b9f4e8ccc 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h
@@ -110,7 +110,7 @@ namespace NKikimr {
}
}
- TRope GetPart(ui8 part, TRope *holder) const {
+ const TRope& GetPart(ui8 part, TRope *holder) const {
return GetPart(part, 0, GetPartSize(part), holder);
}
@@ -266,27 +266,41 @@ namespace NKikimr {
// used by blob merger
void MergePart(const TDiskBlob& source, TPartIterator iter) {
- const ui8 part = iter.GetPartId() - 1;
- Y_ABORT_UNLESS(!Rope); // ensure that this blob is used inside merger
- Y_ABORT_UNLESS(FullDataSize == 0 || FullDataSize == source.FullDataSize, "FullDataSize# %" PRIu32 " source.FullDataSize# %" PRIu32,
- FullDataSize, source.FullDataSize);
+ if (const ui8 partIdx = iter.GetPartId() - 1; Parts.Empty() || !Parts.Get(partIdx)) {
+ MergePart(iter.GetPart(), partIdx, source.FullDataSize, source.Parts.GetSize());
+ }
+ }
+
+ void MergePart(TRope&& data, ui8 partIdx, ui64 fullDataSize, ui8 totalPartCount) {
+ Y_ABORT_UNLESS(!Rope);
+ Y_ABORT_UNLESS(FullDataSize == 0 || FullDataSize == fullDataSize);
if (Parts.Empty()) {
- Parts = NMatrix::TVectorType(0, source.Parts.GetSize());
+ Parts = NMatrix::TVectorType(0, totalPartCount);
PartOffs.fill(0); // we don't care about absolute offsets here
} else {
- Y_ABORT_UNLESS(Parts.GetSize() == source.Parts.GetSize());
+ Y_ABORT_UNLESS(Parts.GetSize() == totalPartCount);
}
- if (!Parts.Get(part)) {
- Parts.Set(part);
- TRope partData = iter.GetPart();
- for (ui8 i = part + 1; i <= Parts.GetSize(); ++i) {
- PartOffs[i] += partData.GetSize();
+ if (!Parts.Get(partIdx)) {
+ Parts.Set(partIdx);
+ for (ui8 i = partIdx + 1; i <= Parts.GetSize(); ++i) {
+ PartOffs[i] += data.GetSize();
}
- Y_ABORT_UNLESS(part < PartData.size());
- PartData[part] = std::move(partData);
- FullDataSize = source.FullDataSize;
+ Y_ABORT_UNLESS(partIdx < PartData.size());
+ PartData[partIdx] = std::move(data);
+ FullDataSize = fullDataSize;
+ }
+ }
+
+ void ClearPart(ui8 partIdx) {
+ Y_DEBUG_ABORT_UNLESS(!Parts.Empty());
+ Y_DEBUG_ABORT_UNLESS(Parts.Get(partIdx));
+ Parts.Clear(partIdx);
+ const TRope data = std::exchange(PartData[partIdx], {});
+ const ui32 size = data.GetSize();
+ for (ui8 i = partIdx + 1; i <= Parts.GetSize(); ++i) {
+ PartOffs[i] -= size;
}
}
@@ -316,8 +330,11 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
class TDiskBlobMerger {
public:
- TDiskBlobMerger()
- {}
+ TDiskBlobMerger() = default;
+
+ TDiskBlobMerger(const TDiskBlob& blob) {
+ Add(blob);
+ }
void Clear() {
Blob = {};
@@ -332,6 +349,15 @@ namespace NKikimr {
Blob.MergePart(source, it);
}
+ void AddPart(TRope&& data, TBlobStorageGroupType gtype, const TLogoBlobID& id) {
+ Y_DEBUG_ABORT_UNLESS(id.PartId());
+ Blob.MergePart(std::move(data), id.PartId() - 1, id.BlobSize(), gtype.TotalPartCount());
+ }
+
+ void ClearPart(ui8 partIdx) {
+ Blob.ClearPart(partIdx);
+ }
+
bool Empty() const {
return Blob.Empty();
}
@@ -372,54 +398,4 @@ namespace NKikimr {
TDiskBlob Blob;
};
- class TDiskBlobMergerWithMask : public TDiskBlobMerger {
- public:
- TDiskBlobMergerWithMask() = default;
- TDiskBlobMergerWithMask(const TDiskBlobMergerWithMask&) = default;
- TDiskBlobMergerWithMask(TDiskBlobMergerWithMask&&) = default;
-
- TDiskBlobMergerWithMask(const TDiskBlobMerger& base, NMatrix::TVectorType mask)
- : AddFilterMask(mask)
- {
- // TODO(alexvru): check for saneness; maybe we shall not provide blobs not in mask?
- const TDiskBlob& blob = base.GetDiskBlob();
- for (auto it = blob.begin(); it != blob.end(); ++it) {
- AddPart(blob, it);
- }
- }
-
- void Clear() {
- TDiskBlobMerger::Clear();
- AddFilterMask.Clear();
- }
-
- void SetFilterMask(NMatrix::TVectorType mask) {
- Y_ABORT_UNLESS(!AddFilterMask);
- AddFilterMask = mask;
- }
-
- void Add(const TDiskBlob &addBlob) {
- Y_ABORT_UNLESS(AddFilterMask);
- NMatrix::TVectorType addParts = addBlob.GetParts() & *AddFilterMask;
- if (!addParts.Empty()) {
- TDiskBlobMerger::AddImpl(addBlob, addParts);
- }
- }
-
- void AddPart(const TDiskBlob& source, const TDiskBlob::TPartIterator& it) {
- Y_ABORT_UNLESS(AddFilterMask);
- if (AddFilterMask->Get(it.GetPartId() - 1)) {
- TDiskBlobMerger::AddPart(source, it);
- }
- }
-
- void Swap(TDiskBlobMergerWithMask& m) {
- TDiskBlobMerger::Swap(m);
- DoSwap(AddFilterMask, m.AddFilterMask);
- }
-
- private:
- TMaybe<NMatrix::TVectorType> AddFilterMask;
- };
-
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp
index 70b622d79bd..21548fa0759 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob_ut.cpp
@@ -143,53 +143,6 @@ namespace NKikimr {
}
}
}
-
- Y_UNIT_TEST(FilterMask) {
- for (bool addHeader1 : {true, false}) {
- for (bool addHeader2 : {true, false}) {
- for (bool addHeader3 : {true, false}) {
- const ui8 numParts = 6;
- for (ui32 mask1 = 1; mask1 < (1 << numParts); ++mask1) {
- for (ui32 mask2 = 1; mask2 < (1 << numParts); ++mask2) {
- if (!(mask1 & mask2)) {
- continue;
- }
-
- NMatrix::TVectorType partsToStore(0, numParts);
- for (ui8 i = 0; i < numParts; ++i) {
- if (mask2 >> i & 1) {
- partsToStore.Set(i);
- }
- }
-
- TDiskBlobMergerWithMask m;
- m.SetFilterMask(partsToStore);
- for (ui8 i = 0; i < numParts; ++i) {
- if (mask1 >> i & 1) {
- NMatrix::TVectorType v(0, numParts);
- v.Set(i);
- TRope buffer = TDiskBlob::Create(8, i + 1, numParts, TRope(Sprintf("%08x", i)), Arena, addHeader1);
- m.Add(TDiskBlob(&buffer, v, GType, TLogoBlobID(0, 0, 0, 0, 8, 0)));
- }
- }
-
- TDiskBlobMerger m2;
- for (ui8 i = 0; i < numParts; ++i) {
- if ((mask1 & mask2) >> i & 1) {
- NMatrix::TVectorType v(0, numParts);
- v.Set(i);
- TRope buffer = TDiskBlob::Create(8, i + 1, numParts, TRope(Sprintf("%08x", i)), Arena, addHeader2);
- m2.Add(TDiskBlob(&buffer, v, GType, TLogoBlobID(0, 0, 0, 0, 8, 0)));
- }
- }
-
- UNIT_ASSERT_EQUAL(m.CreateDiskBlob(Arena, addHeader3), m2.CreateDiskBlob(Arena, addHeader3));
- }
- }
- }
- }
- }
- }
}
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp
index ad5f8d223f0..f73a386a865 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.cpp
@@ -88,8 +88,9 @@ namespace NKikimr {
THullCtx::THullCtx(TVDiskContextPtr vctx, ui32 chunkSize, ui32 compWorthReadSize, bool freshCompaction,
bool gcOnlySynced, bool allowKeepFlags, bool barrierValidation, ui32 hullSstSizeInChunksFresh,
ui32 hullSstSizeInChunksLevel, double hullCompFreeSpaceThreshold, ui32 freshCompMaxInFlightWrites,
- ui32 hullCompMaxInFlightWrites, ui32 hullCompMaxInFlightReads, double hullCompReadBatchEfficiencyThreshold,
- TDuration hullCompStorageRatioCalcPeriod, TDuration hullCompStorageRatioMaxCalcDuration, bool addHeader)
+ ui32 freshCompMaxInFlightReads, ui32 hullCompMaxInFlightWrites, ui32 hullCompMaxInFlightReads,
+ double hullCompReadBatchEfficiencyThreshold, TDuration hullCompStorageRatioCalcPeriod,
+ TDuration hullCompStorageRatioMaxCalcDuration, bool addHeader)
: VCtx(std::move(vctx))
, IngressCache(TIngressCache::Create(VCtx->Top, VCtx->ShortSelfVDisk))
, ChunkSize(chunkSize)
@@ -102,6 +103,7 @@ namespace NKikimr {
, HullSstSizeInChunksLevel(hullSstSizeInChunksLevel)
, HullCompFreeSpaceThreshold(hullCompFreeSpaceThreshold)
, FreshCompMaxInFlightWrites(freshCompMaxInFlightWrites)
+ , FreshCompMaxInFlightReads(freshCompMaxInFlightReads)
, HullCompMaxInFlightWrites(hullCompMaxInFlightWrites)
, HullCompMaxInFlightReads(hullCompMaxInFlightReads)
, HullCompReadBatchEfficiencyThreshold(hullCompReadBatchEfficiencyThreshold)
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h
index 72406faee5a..d21c5096e3a 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h
@@ -132,6 +132,7 @@ namespace NKikimr {
const ui32 HullSstSizeInChunksLevel;
const double HullCompFreeSpaceThreshold;
const ui32 FreshCompMaxInFlightWrites;
+ const ui32 FreshCompMaxInFlightReads;
const ui32 HullCompMaxInFlightWrites;
const ui32 HullCompMaxInFlightReads;
const double HullCompReadBatchEfficiencyThreshold;
@@ -154,6 +155,7 @@ namespace NKikimr {
ui32 hullSstSizeInChunksLevel,
double hullCompFreeSpaceThreshold,
ui32 freshCompMaxInFlightWrites,
+ ui32 freshCompMaxInFlightReads,
ui32 hullCompMaxInFlightWrites,
ui32 hullCompMaxInFlightReads,
double hullCompReadBatchEfficiencyThreshold,
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h
index 55ec8622805..604a0aa3162 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_barrier.h
@@ -71,11 +71,6 @@ namespace NKikimr {
TabletId, Channel, Gen, GenCounter, Hard ? "hard" : "soft");
}
- TLogoBlobID LogoBlobID() const {
- return TLogoBlobID();
- }
-
-
void Serialize(NKikimrBlobStorage::TBarrierKey &proto) const {
proto.SetTabletId(TabletId);
proto.SetChannel(Channel);
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h
index 0e804112870..89f4145164a 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullbase_block.h
@@ -33,10 +33,6 @@ namespace NKikimr {
return Sprintf("[%16" PRIu64 "]", TabletId);
}
- TLogoBlobID LogoBlobID() const {
- return TLogoBlobID();
- }
-
static TKeyBlock First() {
return TKeyBlock();
}
diff --git a/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h b/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h
index c2777d6ee9c..84916881f01 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/base/hullds_ut.h
@@ -27,6 +27,7 @@ namespace NKikimr {
1, // HullSstSizeInChunksLevel
2.0,
10, // FreshCompMaxInFlightWrites
+ 10, // FreshCompMaxInFlightReads
10, // HullCompMaxInFlightWrites
20, // HullCompMaxInFlightReads
0.5,
diff --git a/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h b/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h
index 2c579e4260e..894fbf9fcb4 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/compstrat/hulldb_compstrat_defs.h
@@ -74,6 +74,7 @@ namespace NKikimr {
TLeveledSsts TablesToAdd;
// huge blobs to delete
TDiskPartVec HugeBlobsToDelete;
+ TDiskPartVec HugeBlobsAllocated;
// is data finalized
bool Finalized = false;
@@ -101,6 +102,11 @@ namespace NKikimr {
return HugeBlobsToDelete;
}
+ const TDiskPartVec &GetHugeBlobsAllocated() const {
+ Y_ABORT_UNLESS(Finalized);
+ return HugeBlobsAllocated;
+ }
+
TDiskPartVec ExtractHugeBlobsToDelete() {
Y_ABORT_UNLESS(Finalized);
return std::move(HugeBlobsToDelete);
@@ -202,6 +208,7 @@ namespace NKikimr {
using TBase::TablesToDelete;
using TBase::TablesToAdd;
using TBase::HugeBlobsToDelete;
+ using TBase::HugeBlobsAllocated;
ui32 TargetLevel = (ui32)(-1);
TKey LastCompactedKey = TKey::First();
@@ -241,7 +248,8 @@ namespace NKikimr {
void CompactionFinished(
TOrderedLevelSegmentsPtr &&segVec,
- TDiskPartVec &&hugeBlobsToDelete,
+ TDiskPartVec&& hugeBlobsToDelete,
+ TDiskPartVec&& hugeBlobsAllocated,
bool aborted)
{
if (aborted) {
@@ -249,9 +257,11 @@ namespace NKikimr {
TablesToDelete.Clear();
TablesToAdd.Clear();
HugeBlobsToDelete.Clear();
+ HugeBlobsAllocated.Clear();
} else {
Y_ABORT_UNLESS(!TablesToDelete.Empty());
HugeBlobsToDelete = std::move(hugeBlobsToDelete);
+ HugeBlobsAllocated = std::move(hugeBlobsAllocated);
if (segVec) {
TLeveledSsts tmp(TargetLevel, *segVec);
TablesToAdd.Swap(tmp);
@@ -316,6 +326,10 @@ namespace NKikimr {
return GetPtr()->GetHugeBlobsToDelete();
}
+ const TDiskPartVec &GetHugeBlobsAllocated() const {
+ return GetPtr()->GetHugeBlobsAllocated();
+ }
+
TDiskPartVec ExtractHugeBlobsToDelete() {
return const_cast<TBase*>(GetPtr())->ExtractHugeBlobsToDelete();
}
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h
index adbca0fa8f2..ab7282f0c82 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hulldatamerger.h
@@ -3,101 +3,351 @@
#include "defs.h"
#include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hulldefs.h>
#include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h>
+#include <ydb/core/blobstorage/vdisk/hulldb/base/hullbase_logoblob.h>
#include <ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h>
#include <util/generic/noncopyable.h>
namespace NKikimr {
- ////////////////////////////////////////////////////////////////////////////
- // TDataMerger
- ////////////////////////////////////////////////////////////////////////////
class TDataMerger : TNonCopyable {
+ public:
+ struct TCollectTask {
+ TDiskBlobMerger BlobMerger; // base blob for compaction (obtained from in-memory records)
+ std::vector<std::tuple<TDiskPart, ui8>> Reads; // a set of extra reads _of distinct parts, not blobs_
+
+ void Clear() {
+ BlobMerger.Clear();
+ Reads.clear();
+ }
+ };
+
+ struct THugeBlobWrite {
+ ui8 PartIdx;
+ const TRope *From;
+ TDiskPart To;
+ };
+
+ struct THugeBlobMove {
+ ui8 PartIdx;
+ TDiskPart From;
+ TDiskPart To;
+ };
+
private:
- TDiskBlobMerger DiskBlobMerger;
- NHuge::TBlobMerger HugeBlobMerger;
+ struct TPart {
+ const TRope *InMemData = nullptr;
+ TDiskPart SmallBlobPart; // a reference to part data, without any headers
+ TDiskPart HugeBlob; // a reference to the whole huge blob, with headers added
+ ui64 HugeBlobCircaLsn = 0;
+ ui32 HugePartSize = 0;
+ bool IsMetadataPart = false;
+
+ bool NeedHugeSlot() const {
+ return HugeBlob.Empty() && (InMemData || !SmallBlobPart.Empty());
+ }
+ };
+
+ // immutable fields
+ TBlobStorageGroupType GType;
+ bool AddHeader = false;
+
+ // clearable fields
+ std::vector<TPart> Parts;
+ std::vector<TDiskPart> DeletedHugeBlobs;
+ NMatrix::TVectorType PartsMask; // mask of all present parts
+ std::vector<TDiskPart> SavedHugeBlobs;
+ bool Finished = false;
+ std::vector<ui32> SlotsToAllocate;
+ TCollectTask CollectTask;
+ std::vector<THugeBlobWrite> HugeBlobWrites;
+ std::vector<THugeBlobMove> HugeBlobMoves;
public:
- TDataMerger()
- : DiskBlobMerger()
- , HugeBlobMerger()
+ TDataMerger(TBlobStorageGroupType gtype, bool addHeader)
+ : GType(gtype)
+ , AddHeader(addHeader)
+ , Parts(GType.TotalPartCount())
+ , PartsMask(0, GType.TotalPartCount())
{}
bool Empty() const {
- return DiskBlobMerger.Empty() && HugeBlobMerger.Empty();
+ return PartsMask.Empty();
}
void Clear() {
- DiskBlobMerger.Clear();
- HugeBlobMerger.Clear();
+ std::ranges::fill(Parts, TPart());
+ DeletedHugeBlobs.clear();
+ PartsMask.Clear();
+ SavedHugeBlobs.clear();
+ Finished = false;
+ SlotsToAllocate.clear();
+ CollectTask.Clear();
+ HugeBlobWrites.clear();
+ HugeBlobMoves.clear();
}
- void Swap(TDataMerger &m) {
- DiskBlobMerger.Swap(m.DiskBlobMerger);
- HugeBlobMerger.Swap(m.HugeBlobMerger);
- }
+ void Add(const TMemRecLogoBlob& memRec, std::variant<const TRope*, const TDiskPart*> dataOrOutbound,
+ ui64 circaLsn, const TLogoBlobID& fullId) {
+ const NMatrix::TVectorType parts = memRec.GetLocalParts(GType);
+
+ if (memRec.GetType() == TBlobType::MemBlob) {
+ auto data = std::visit<const TRope*>(TOverloaded{
+ [&](const TRope *x) { return x; },
+ [&](const TDiskPart*) { return nullptr; }
+ }, dataOrOutbound);
- TBlobType::EType GetType() const {
- Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty() || DiskBlobMerger.Empty());
- if (!HugeBlobMerger.Empty()) {
- return HugeBlobMerger.GetBlobType();
+ Y_ABORT_UNLESS(data);
+ Y_DEBUG_ABORT_UNLESS(parts.CountBits() == 1); // only single part per record in memory
+ for (ui8 partIdx : parts) {
+ Y_DEBUG_ABORT_UNLESS(partIdx < Parts.size());
+ Parts[partIdx].InMemData = data;
+ PartsMask.Set(partIdx);
+ }
} else {
- // both for !DiskBlobMerger.Empty() and DiskBlobMerger.Empty()
- return TBlobType::DiskBlob;
+ auto outbound = std::visit<const TDiskPart*>(TOverloaded{
+ [&](const TRope *x) { Y_DEBUG_ABORT_UNLESS(!x); return nullptr; },
+ [&](const TDiskPart *x) { return x; }
+ }, dataOrOutbound);
+
+ TDiskDataExtractor extr;
+ memRec.GetDiskData(&extr, outbound);
+
+ if (memRec.GetType() == TBlobType::DiskBlob) {
+ const TDiskPart& location = extr.SwearOne();
+ ui32 offset = AddHeader ? TDiskBlob::HeaderSize : 0;
+ for (ui8 partIdx : parts) {
+ const ui32 partSize = GType.PartSize(TLogoBlobID(fullId, partIdx + 1));
+ Y_DEBUG_ABORT_UNLESS(partIdx < Parts.size());
+ if (partSize) {
+ Parts[partIdx].SmallBlobPart = {location.ChunkIdx, location.Offset + offset, partSize};
+ }
+ offset += partSize;
+ PartsMask.Set(partIdx);
+ }
+ } else {
+ AddHugeBlob(extr.Begin, extr.End, parts, circaLsn);
+ }
}
}
- ui32 GetInplacedSize(bool addHeader) const {
- Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty() || DiskBlobMerger.Empty());
- return HugeBlobMerger.Empty() ? DiskBlobMerger.GetDiskBlob().GetBlobSize(addHeader) : 0;
- }
+ void Finish(bool targetingHugeBlob, const TLogoBlobID& fullId, TBlobType::EType *type, ui32 *inplacedDataSize) {
+ Y_DEBUG_ABORT_UNLESS(!Finished);
+
+ if (!Empty()) {
+ // scan through all the parts, see what we got
+ NMatrix::TVectorType inMemParts(0, GType.TotalPartCount());
+ NMatrix::TVectorType smallDiskParts(0, GType.TotalPartCount());
+ NMatrix::TVectorType hugeDiskParts(0, GType.TotalPartCount());
+ for (ui8 partIdx : PartsMask) {
+ TPart& part = Parts[partIdx];
+ if (part.InMemData) {
+ inMemParts.Set(partIdx);
+ }
+ if (!part.SmallBlobPart.Empty()) {
+ smallDiskParts.Set(partIdx);
+ }
+ if (!part.HugeBlob.Empty()) {
+ hugeDiskParts.Set(partIdx);
+ }
+ }
+
+ bool producingHugeBlob = false;
+
+ if (inMemParts.Empty() && smallDiskParts.Empty()) { // we only have huge blobs, so keep it this way
+ producingHugeBlob = true;
+ } else {
+ producingHugeBlob = targetingHugeBlob;
+ }
- void AddHugeBlob(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType &parts,
- ui64 circaLsn) {
- Y_DEBUG_ABORT_UNLESS(DiskBlobMerger.Empty());
- HugeBlobMerger.Add(begin, end, parts, circaLsn);
+ // calculate blob for if we are going to keep it inplace
+ *inplacedDataSize = producingHugeBlob
+ ? 0
+ : TDiskBlob::CalculateBlobSize(GType, fullId, PartsMask, AddHeader);
+
+ TDiskBlobMerger merger;
+
+ for (ui8 partIdx : PartsMask) {
+ TPart& part = Parts[partIdx];
+ const ui32 partSize = GType.PartSize(TLogoBlobID(fullId, partIdx + 1));
+ part.IsMetadataPart = !partSize;
+ const NMatrix::TVectorType partMask = NMatrix::TVectorType::MakeOneHot(partIdx, GType.TotalPartCount());
+
+ if (producingHugeBlob) {
+ SavedHugeBlobs.push_back(part.HugeBlob);
+ if (!part.IsMetadataPart && part.NeedHugeSlot()) {
+ part.HugePartSize = TDiskBlob::CalculateBlobSize(GType, fullId, partMask, AddHeader);
+ SlotsToAllocate.push_back(part.HugePartSize);
+ }
+ } else {
+ if (part.InMemData) { // prefer in-memory data if we have options
+ merger.Add(TDiskBlob(part.InMemData, partMask, GType, fullId));
+ } else if (!part.SmallBlobPart.Empty()) {
+ CollectTask.Reads.emplace_back(part.SmallBlobPart, partIdx);
+ } else if (!part.HugeBlob.Empty()) { // dropping this huge part after compaction
+ TDiskPart location;
+ if (part.HugeBlob.Size == partSize) {
+ location = part.HugeBlob;
+ } else if (part.HugeBlob.Size == partSize + TDiskBlob::HeaderSize) {
+ location = TDiskPart(part.HugeBlob.ChunkIdx, part.HugeBlob.Offset + TDiskBlob::HeaderSize,
+ part.HugeBlob.Size - TDiskBlob::HeaderSize);
+ } else {
+ Y_ABORT("incorrect huge blob size");
+ }
+ CollectTask.Reads.emplace_back(location, partIdx);
+ DeletedHugeBlobs.push_back(part.HugeBlob);
+ } else { // add metadata part to merger
+ merger.AddPart(TRope(), GType, TLogoBlobID(fullId, partIdx + 1));
+ }
+ }
+ }
+
+ Y_DEBUG_ABORT_UNLESS(!producingHugeBlob || merger.Empty());
+ CollectTask.BlobMerger = merger;
+
+ *type = !producingHugeBlob ? TBlobType::DiskBlob :
+ PartsMask.CountBits() > 1 ? TBlobType::ManyHugeBlobs : TBlobType::HugeBlob;
+
+ Y_DEBUG_ABORT_UNLESS(SavedHugeBlobs.size() == (producingHugeBlob ? PartsMask.CountBits() : 0));
+ }
+
+ Finished = true;
}
- void AddDeletedHugeBlob(const TDiskPart& part) {
- HugeBlobMerger.AddDeletedPart(part);
+ void FinishFromBlob() {
+ Y_DEBUG_ABORT_UNLESS(!Finished);
+ Finished = true;
}
- void AddBlob(const TDiskBlob &addBlob) {
- Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty());
- DiskBlobMerger.Add(addBlob);
+ void AddHugeBlob(const TDiskPart *begin, const TDiskPart *end, const NMatrix::TVectorType& parts, ui64 circaLsn) {
+ Y_DEBUG_ABORT_UNLESS(parts.CountBits() == end - begin);
+ const TDiskPart *location = begin;
+ for (ui8 partIdx : parts) {
+ Y_DEBUG_ABORT_UNLESS(partIdx < Parts.size());
+ auto& part = Parts[partIdx];
+ if (location->ChunkIdx) { // consider only data parts
+ if (part.HugeBlob.Empty() || part.HugeBlobCircaLsn < circaLsn) {
+ if (!part.HugeBlob.Empty()) {
+ DeletedHugeBlobs.push_back(part.HugeBlob);
+ }
+ part.HugeBlob = *location;
+ part.HugeBlobCircaLsn = circaLsn;
+ } else {
+ DeletedHugeBlobs.push_back(*location);
+ }
+ }
+ ++location;
+ }
+ Y_DEBUG_ABORT_UNLESS(location == end);
+ PartsMask |= parts;
}
- void AddPart(const TDiskBlob& source, const TDiskBlob::TPartIterator& iter) {
- Y_DEBUG_ABORT_UNLESS(HugeBlobMerger.Empty());
- DiskBlobMerger.AddPart(source, iter);
+ template<typename T>
+ T& CheckFinished(T& value) const {
+ Y_DEBUG_ABORT_UNLESS(Finished);
+ return value;
}
- void SetEmptyFromAnotherMerger(const TDataMerger *dataMerger) {
- DiskBlobMerger.Clear();
- HugeBlobMerger.SetEmptyFromAnotherMerger(&dataMerger->HugeBlobMerger);
+ const NMatrix::TVectorType GetParts() const { return CheckFinished(PartsMask); }
+ const std::vector<TDiskPart>& GetSavedHugeBlobs() const { return CheckFinished(SavedHugeBlobs); }
+ const std::vector<TDiskPart>& GetDeletedHugeBlobs() const { return CheckFinished(DeletedHugeBlobs); }
+ const TCollectTask& GetCollectTask() const { return CheckFinished(CollectTask); }
+ const auto& GetHugeBlobWrites() const { return CheckFinished(HugeBlobWrites); }
+ const auto& GetHugeBlobMoves() const { return CheckFinished(HugeBlobMoves); }
+
+ bool Ready() const {
+ Y_DEBUG_ABORT_UNLESS(Finished);
+ return CollectTask.Reads.empty()
+ && HugeBlobWrites.empty()
+ && HugeBlobMoves.empty();
}
- bool HasSmallBlobs() const {
- return !DiskBlobMerger.Empty();
+ TRope CreateDiskBlob(TRopeArena& arena) {
+ Y_DEBUG_ABORT_UNLESS(Finished);
+ Y_DEBUG_ABORT_UNLESS(!CollectTask.BlobMerger.Empty());
+ return CollectTask.BlobMerger.CreateDiskBlob(arena, AddHeader);
}
- ui32 GetDiskBlobRawSize(bool addHeader) const {
- Y_DEBUG_ABORT_UNLESS(!DiskBlobMerger.Empty());
- return DiskBlobMerger.GetDiskBlob().GetBlobSize(addHeader);
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Huge slots operation
+
+ std::vector<ui32>& GetSlotsToAllocate() {
+ Y_DEBUG_ABORT_UNLESS(Finished);
+ return SlotsToAllocate;
}
- const TDiskBlobMerger &GetDiskBlobMerger() const {
- return DiskBlobMerger;
+ void ApplyAllocatedSlots(std::vector<TDiskPart>& allocatedSlots) {
+ size_t savedIndex = 0;
+ size_t index = 0;
+ for (ui8 partIdx : PartsMask) {
+ if (const TPart& part = Parts[partIdx]; !part.HugeBlob.Empty()) {
+ Y_DEBUG_ABORT_UNLESS(part.HugeBlob == SavedHugeBlobs[savedIndex]);
+ ++savedIndex;
+ } else if (!part.IsMetadataPart && part.NeedHugeSlot()) {
+ TDiskPart& location = SavedHugeBlobs[savedIndex++] = allocatedSlots[index++];
+ Y_ABORT_UNLESS(part.HugePartSize <= location.Size);
+ location.Size = part.HugePartSize;
+ if (part.InMemData) {
+ HugeBlobWrites.push_back({
+ .PartIdx = partIdx,
+ .From = part.InMemData,
+ .To = location,
+ });
+ } else if (!part.SmallBlobPart.Empty()) {
+ HugeBlobMoves.push_back({
+ .PartIdx = partIdx,
+ .From = part.SmallBlobPart,
+ .To = location,
+ });
+ } else {
+ Y_ABORT("impossible case");
+ }
+ }
+ }
+ Y_DEBUG_ABORT_UNLESS(savedIndex == SavedHugeBlobs.size());
+ Y_DEBUG_ABORT_UNLESS(index == allocatedSlots.size());
}
- const NHuge::TBlobMerger &GetHugeBlobMerger() const {
- return HugeBlobMerger;
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // THandoffMap transformations; called before any reads were done, metadata-only processing (but within Fresh
+ // compation some data might be already available)
+
+ void MakeEmpty() {
+ Y_DEBUG_ABORT_UNLESS(Finished);
+ DeletedHugeBlobs.insert(DeletedHugeBlobs.end(), SavedHugeBlobs.begin(), SavedHugeBlobs.end());
+ auto deletedHugeBlobs = std::move(DeletedHugeBlobs);
+ Clear();
+ DeletedHugeBlobs = std::move(deletedHugeBlobs);
+ Finished = true;
}
- TString ToString() const {
- TStringStream str;
- str << "{DiskBlobMerger# " << DiskBlobMerger.ToString();
- str << " HugeBlobMerger# " << HugeBlobMerger.ToString() << "}";
- return str.Str();
+ void FilterLocalParts(NMatrix::TVectorType remainingLocalParts, const TLogoBlobID& fullId,
+ TBlobType::EType *type, ui32 *inplacedDataSize) {
+ Y_DEBUG_ABORT_UNLESS(Finished);
+ const NMatrix::TVectorType partsToRemove = PartsMask & ~remainingLocalParts;
+ for (ui8 partIdx : partsToRemove) { // local parts to remove
+ if (const auto& part = Parts[partIdx].HugeBlob; !part.Empty()) {
+ DeletedHugeBlobs.push_back(part);
+ const auto it = std::remove(SavedHugeBlobs.begin(), SavedHugeBlobs.end(), part);
+ Y_ABORT_UNLESS(std::next(it) == SavedHugeBlobs.end());
+ SavedHugeBlobs.pop_back();
+ } else if (Parts[partIdx].InMemData) {
+ CollectTask.BlobMerger.ClearPart(partIdx);
+ }
+ Parts[partIdx] = {};
+ }
+
+ auto pred = [&](const std::tuple<TDiskPart, ui8>& x) { return partsToRemove.Get(std::get<1>(x)); };
+ CollectTask.Reads.erase(std::remove_if(CollectTask.Reads.begin(), CollectTask.Reads.end(), pred),
+ CollectTask.Reads.end());
+
+ PartsMask &= remainingLocalParts;
+
+ // recalculate memrec data for this new entry
+ *type = SavedHugeBlobs.empty() ? TBlobType::DiskBlob :
+ SavedHugeBlobs.size() == 1 ? TBlobType::HugeBlob : TBlobType::ManyHugeBlobs;
+ if (!Empty() && *type == TBlobType::DiskBlob) {
+ *inplacedDataSize = TDiskBlob::CalculateBlobSize(GType, fullId, PartsMask, AddHeader);
+ }
}
};
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
index cf6eba18ab9..077393edf80 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
@@ -123,163 +123,74 @@ namespace NKikimr {
using TBase::Empty;
using TBase::AddBasic;
- enum class ELoadData {
- NotSet = 0,
- LoadData = 1,
- DontLoadData = 2
- };
-
public:
- TCompactRecordMerger(const TBlobStorageGroupType &gtype, bool addHeader)
+ TCompactRecordMerger(TBlobStorageGroupType gtype, bool addHeader)
: TBase(gtype, true)
, AddHeader(addHeader)
+ , DataMerger(gtype, addHeader)
{}
void Clear() {
TBase::Clear();
- MemRecs.clear();
- ProducingSmallBlob = false;
- ProducingHugeBlob = false;
- NeedToLoadData = ELoadData::NotSet;
DataMerger.Clear();
+ Key = {};
}
using TBase::GetMemRec;
using TBase::HaveToMergeData;
- void SetLoadDataMode(bool loadData) {
- NeedToLoadData = loadData ? ELoadData::LoadData : ELoadData::DontLoadData;
- }
-
void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
- Add(memRec, nullptr, outbound, key, circaLsn);
+ Add(memRec, outbound, key, circaLsn);
}
void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
- Add(memRec, data, nullptr, key, lsn);
+ Add(memRec, data, key, lsn);
}
- void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
- Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
+ void Add(const TMemRec& memRec, std::variant<const TRope*, const TDiskPart*> dataOrOutbound, const TKey& key, ui64 lsn) {
+ Y_DEBUG_ABORT_UNLESS(Key == TKey{} || Key == key);
+ Key = key;
AddBasic(memRec, key);
- if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
- TDiskDataExtractor extr;
- switch (memRec.GetType()) {
- case TBlobType::MemBlob:
- case TBlobType::DiskBlob:
- if (NeedToLoadData == ELoadData::LoadData) {
- if (data) {
- // we have some data in-memory
- DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID()));
- }
- if (memRec.GetType() == TBlobType::DiskBlob) {
- if (memRec.HasData()) { // there is something to read from the disk
- MemRecs.push_back(memRec);
- } else { // headerless metadata stored
- static TRope emptyRope;
- DataMerger.AddBlob(TDiskBlob(&emptyRope, local, GType, key.LogoBlobID()));
- }
- }
- Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob);
- ProducingSmallBlob = true;
- }
- break;
-
- case TBlobType::ManyHugeBlobs:
- Y_ABORT_UNLESS(outbound);
- [[fallthrough]];
- case TBlobType::HugeBlob:
- memRec.GetDiskData(&extr, outbound);
- DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn);
- Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob);
- ProducingHugeBlob = true;
- break;
- }
+ if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) {
+ DataMerger.Add(memRec, dataOrOutbound, lsn, key.LogoBlobID());
}
- VerifyConsistency(memRec, outbound);
}
- void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) {
-#ifdef NDEBUG
- return;
-#endif
- if constexpr (std::is_same_v<TMemRec, TMemRecLogoBlob>) {
- if (const auto m = memRec.GetType(); m == TBlobType::HugeBlob || m == TBlobType::ManyHugeBlobs) {
- TDiskDataExtractor extr;
- memRec.GetDiskData(&extr, outbound);
- Y_ABORT_UNLESS(extr.End - extr.Begin == memRec.GetIngress().LocalParts(GType).CountBits());
- }
- switch (memRec.GetType()) {
- case TBlobType::DiskBlob:
- Y_ABORT_UNLESS(!memRec.HasData() || DataMerger.GetHugeBlobMerger().Empty());
- break;
-
- case TBlobType::HugeBlob:
- case TBlobType::ManyHugeBlobs:
- Y_ABORT_UNLESS(memRec.HasData() && DataMerger.GetDiskBlobMerger().Empty());
- break;
-
- case TBlobType::MemBlob:
- Y_ABORT_UNLESS(memRec.HasData() && DataMerger.GetHugeBlobMerger().Empty());
- break;
- }
- VerifyConsistency();
- }
- }
+ void Finish(bool targetingHugeBlob) {
+ Y_DEBUG_ABORT_UNLESS(!Finished);
+ Y_DEBUG_ABORT_UNLESS(!Empty());
- void VerifyConsistency() {
- if constexpr (std::is_same_v<TMemRec, TMemRecLogoBlob>) {
- Y_DEBUG_ABORT_UNLESS(DataMerger.GetHugeBlobMerger().Empty() || MemRec.GetIngress().LocalParts(GType).CountBits() ==
- DataMerger.GetHugeBlobMerger().GetNumParts());
- }
- }
+ MemRec.SetNoBlob();
- void Finish() {
- if (NeedToLoadData == ELoadData::DontLoadData) {
- Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
- // if we have huge blobs for the record, than we set TBlobType::HugeBlob or
- // TBlobType::ManyHugeBlobs a few lines below
- MemRec.SetNoBlob();
- }
+ if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) {
+ TBlobType::EType type = TBlobType::DiskBlob;
+ ui32 inplacedDataSize = 0;
+ DataMerger.Finish(targetingHugeBlob, Key.LogoBlobID(), &type, &inplacedDataSize);
- Y_DEBUG_ABORT_UNLESS(!Empty());
- VerifyConsistency();
+ const NMatrix::TVectorType localParts = MemRec.GetLocalParts(GType);
+ Y_ABORT_UNLESS(localParts == DataMerger.GetParts());
- // in case when we keep data and disk merger contains small blobs, we set up small blob record -- this logic
- // is used in replication and in fresh compaction
- if (NeedToLoadData == ELoadData::LoadData && DataMerger.HasSmallBlobs()) {
- TDiskPart addr(0, 0, DataMerger.GetDiskBlobRawSize(AddHeader));
- MemRec.SetDiskBlob(addr);
- }
+ MemRec.SetType(type);
- // clear local bits if we are not going to keep item's data
- if (NeedToLoadData == ELoadData::DontLoadData) {
- MemRec.ClearLocalParts(GType);
+ if (type == TBlobType::DiskBlob && inplacedDataSize) {
+ Y_ABORT_UNLESS(inplacedDataSize == TDiskBlob::CalculateBlobSize(GType, Key.LogoBlobID(),
+ localParts, AddHeader));
+ MemRec.SetDiskBlob(TDiskPart(0, 0, inplacedDataSize));
+ }
}
Finished = true;
- MemRec.SetType(DataMerger.GetType());
}
- const TDataMerger *GetDataMerger() const {
+ TDataMerger& GetDataMerger() {
Y_DEBUG_ABORT_UNLESS(Finished);
- return &DataMerger;
- }
-
- template<typename TCallback>
- void ForEachSmallDiskBlob(TCallback&& callback) {
- for (const auto& memRec : MemRecs) {
- callback(memRec);
- }
+ return DataMerger;
}
protected:
- TStackVec<TMemRec, 16> MemRecs;
- bool ProducingSmallBlob = false;
- bool ProducingHugeBlob = false;
- ELoadData NeedToLoadData = ELoadData::NotSet;
- TDataMerger DataMerger;
const bool AddHeader;
+ TDataMerger DataMerger;
+ TKey Key;
};
////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h
index 748f6d56f7c..b0f9608fb33 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h
@@ -507,49 +507,40 @@ namespace NKikimr {
// check that keys are coming in strictly ascending order
Y_ABORT_UNLESS(Recs.empty() || Recs.back().Key < key);
- switch (memRec.GetType()) {
- case TBlobType::DiskBlob: {
+ TMemRec newMemRec(memRec);
+
+ switch (const TBlobType::EType type = memRec.GetType()) {
+ case TBlobType::DiskBlob:
InplaceDataTotalSize += memRec.DataSize();
ItemsWithInplacedData += !!memRec.DataSize();
- Recs.push_back(TRec(key, memRec));
break;
- }
- case TBlobType::HugeBlob: {
- const TVector<TDiskPart> &saved = dataMerger->GetHugeBlobMerger().SavedData();
- Y_ABORT_UNLESS(saved.size() == 1);
- TMemRec memRecTmp(memRec);
- memRecTmp.SetHugeBlob(saved.at(0));
- HugeDataTotalSize += memRecTmp.DataSize();
- ItemsWithHugeData++;
- Recs.push_back(TRec(key, memRecTmp));
- break;
- }
+ case TBlobType::HugeBlob:
case TBlobType::ManyHugeBlobs: {
- auto beg = dataMerger->GetHugeBlobMerger().SavedData().begin();
- auto end = dataMerger->GetHugeBlobMerger().SavedData().end();
-
- Y_DEBUG_ABORT_UNLESS(beg + 1 < end);
- TMemRec newMemRec(memRec);
- ui32 idx = ui32(Outbound.size());
- ui32 num = ui32(end - beg);
- ui32 size = 0;
- for (auto it = beg; it != end; ++it) {
- size += it->Size;
- }
- newMemRec.SetManyHugeBlobs(idx, num, size);
- for (auto it = beg; it != end; ++it) {
- Outbound.push_back(*it);
+ Y_DEBUG_ABORT_UNLESS(dataMerger);
+ const std::vector<TDiskPart>& saved = dataMerger->GetSavedHugeBlobs();
+
+ if (saved.size() == 1) {
+ newMemRec.SetHugeBlob(saved.front());
+ } else {
+ ui32 size = 0;
+ for (const TDiskPart& part : saved) {
+ size += part.Size;
+ }
+ newMemRec.SetManyHugeBlobs(Outbound.size(), saved.size(), size);
+ Outbound.insert(Outbound.end(), saved.begin(), saved.end());
}
- HugeDataTotalSize += size + sizeof(TDiskPart) * num;
- Recs.push_back(TRec(key, newMemRec));
ItemsWithHugeData++;
+ HugeDataTotalSize += newMemRec.DataSize() + (saved.size() > 1 ? saved.size() * sizeof(TDiskPart) : 0);
break;
}
- default: Y_ABORT("Impossible case");
+
+ default:
+ Y_ABORT("Impossible case");
}
+ Recs.emplace_back(key, newMemRec);
++Items;
}
@@ -831,6 +822,7 @@ namespace NKikimr {
, ChunkSize(chunkSize)
, Arena(arena)
, AddHeader(addHeader)
+ , GType(vctx->Top->GType)
{}
bool Empty() const {
@@ -844,10 +836,12 @@ namespace NKikimr {
return IndexBuilder.GetUsageAfterPush(chunks, intermSize, numAddedOuts) <= ChunksToUse;
}
- bool PushIndexOnly(const TKey& key, const TMemRec& memRec, const TDataMerger *dataMerger, ui32 inplacedDataSize,
- TDiskPart *location) {
+ bool PushIndexOnly(const TKey& key, const TMemRec& memRec, const TDataMerger *dataMerger, TDiskPart *location) {
+ Y_ABORT_UNLESS((std::is_same_v<TKey, TKeyLogoBlob>) == !!dataMerger);
+
// inplacedDataSize must be nonzero for DiskBlob with data and zero in all other cases
- ui32 numAddedOuts = dataMerger->GetHugeBlobMerger().SavedData().size();
+ const ui32 inplacedDataSize = memRec.GetType() == TBlobType::DiskBlob ? memRec.DataSize() : 0;
+ const ui32 numAddedOuts = dataMerger ? dataMerger->GetSavedHugeBlobs().size() : 0;
if (!CheckSpace(inplacedDataSize, numAddedOuts)) {
return false;
}
@@ -867,9 +861,18 @@ namespace NKikimr {
case TBlobType::DiskBlob:
Y_DEBUG_ABORT_UNLESS(numAddedOuts == 0);
+ if constexpr (std::is_same_v<TKey, TKeyLogoBlob>) {
+ const NMatrix::TVectorType localParts = memRec.GetLocalParts(GType);
+ Y_ABORT_UNLESS(inplacedDataSize == (localParts.Empty() ? 0 : TDiskBlob::CalculateBlobSize(GType,
+ key.LogoBlobID(), memRec.GetLocalParts(GType), AddHeader)));
+ } else {
+ Y_ABORT_UNLESS(inplacedDataSize == 0);
+ }
if (inplacedDataSize) {
*location = DataWriter.Preallocate(inplacedDataSize);
memRecToAdd.SetDiskBlob(*location);
+ } else {
+ memRecToAdd.SetNoBlob();
}
break;
@@ -885,32 +888,6 @@ namespace NKikimr {
return DataWriter.Push(std::move(buffer));
}
- // return false on no-more-space
- bool Push(const TKey &key, const TMemRec &memRec, const TDataMerger *dataMerger, ui32 inplacedDataSize = 0) {
- // if this is filled disk blob, then we extract data and calculate inplacedDataSize
- TRope data;
- const auto& diskBlobMerger = dataMerger->GetDiskBlobMerger();
- if (memRec.GetType() == TBlobType::DiskBlob && !diskBlobMerger.Empty()) {
- data = diskBlobMerger.CreateDiskBlob(Arena, AddHeader);
- Y_DEBUG_ABORT_UNLESS(!inplacedDataSize);
- inplacedDataSize = data.GetSize();
- }
-
- TDiskPart preallocatedLocation;
- if (!PushIndexOnly(key, memRec, dataMerger, inplacedDataSize, &preallocatedLocation)) {
- return false;
- }
-
- // write inplace data if we have some
- if (data) {
- Y_DEBUG_ABORT_UNLESS(data.GetSize() == inplacedDataSize);
- TDiskPart writtenLocation = DataWriter.Push(data);
- Y_DEBUG_ABORT_UNLESS(writtenLocation == preallocatedLocation);
- }
-
- return true;
- }
-
// returns true when done, false means 'continue calling me, I'have more chunks to write'
bool FlushNext(ui64 firstLsn, ui64 lastLsn, ui32 maxMsgs) {
if (!DataWriter.IsFinished()) {
@@ -943,6 +920,7 @@ namespace NKikimr {
const ui32 ChunkSize;
TRopeArena& Arena;
const bool AddHeader;
+ const TBlobStorageGroupType GType;
// pending messages
TQueue<std::unique_ptr<NPDisk::TEvChunkWrite>> MsgQueue;
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp
index 385996f4735..91121fbaace 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst_ut.cpp
@@ -171,18 +171,24 @@ namespace NKikimr {
memRec.SetDiskBlob(TDiskPart(0, 0, data.size()));
merger.Clear();
- merger.SetLoadDataMode(true);
merger.AddFromFresh(memRec, &blobBuf, key, step + 1);
- merger.Finish();
-
- bool pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger());
- if (!pushRes) {
+ merger.Finish(false);
+
+ auto push = [&] {
+ TDiskPart preallocatedLocation;
+ bool pushRes = WriterPtr->PushIndexOnly(key, merger.GetMemRec(), &merger.GetDataMerger(), &preallocatedLocation);
+ if (pushRes && merger.GetMemRec().GetType() == TBlobType::DiskBlob && merger.GetMemRec().DataSize()) {
+ const TDiskPart writtenLocation = WriterPtr->PushDataOnly(merger.GetDataMerger().CreateDiskBlob(Arena));
+ Y_ABORT_UNLESS(writtenLocation == preallocatedLocation);
+ }
+ return pushRes;
+ };
+ if (!push()) {
Finish(step);
WriterPtr = std::make_unique<TWriterLogoBlob>(TestCtx.GetVCtx(), EWriterDataType::Fresh, ChunksToUse,
Owner, OwnerRound, ChunkSize, AppendBlockSize, WriteBlockSize, 0, false, ReservedChunks, Arena,
true);
- pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger());
- Y_ABORT_UNLESS(pushRes);
+ Y_ABORT_UNLESS(push());
}
while (auto msg = WriterPtr->GetPendingMessage()) {
Apply(msg);
@@ -212,20 +218,26 @@ namespace NKikimr {
memRec2.SetHugeBlob(TDiskPart(1, 2, 3));
merger.Clear();
- merger.SetLoadDataMode(true);
merger.AddFromFresh(memRec1, nullptr, key, 1);
merger.AddFromFresh(memRec2, nullptr, key, 2);
- merger.Finish();
-
- bool pushRes = WriterPtr->Push(key, merger.GetMemRec(), merger.GetDataMerger());
- if (!pushRes) {
+ merger.Finish(false);
+
+ auto push = [&] {
+ TDiskPart preallocatedLocation;
+ bool pushRes = WriterPtr->PushIndexOnly(key, merger.GetMemRec(), &merger.GetDataMerger(), &preallocatedLocation);
+ if (pushRes && merger.GetMemRec().GetType() == TBlobType::DiskBlob && merger.GetMemRec().DataSize()) {
+ const TDiskPart writtenLocation = WriterPtr->PushDataOnly(merger.GetDataMerger().CreateDiskBlob(Arena));
+ Y_ABORT_UNLESS(writtenLocation == preallocatedLocation);
+ }
+ return pushRes;
+ };
+ if (!push()) {
Finish(step);
WriterPtr = std::make_unique<TWriterLogoBlob>(TestCtx.GetVCtx(), EWriterDataType::Fresh, ChunksToUse,
Owner, OwnerRound, ChunkSize, AppendBlockSize, WriteBlockSize, 0, false, ReservedChunks, Arena,
true);
- pushRes = WriterPtr->Push(key, merger.GetMemRec(), merger.GetDataMerger());
- Y_ABORT_UNLESS(pushRes);
+ Y_ABORT_UNLESS(push());
}
while (auto msg = WriterPtr->GetPendingMessage()) {
Apply(msg);
@@ -243,19 +255,16 @@ namespace NKikimr {
TKeyBlock key(34 + gen);
TMemRecBlock memRec(gen);
merger.Clear();
- merger.SetLoadDataMode(true);
merger.AddFromFresh(memRec, nullptr, key, gen + 1);
- merger.Finish();
+ merger.Finish(false);
- bool pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger());
- if (!pushRes) {
+ if (!WriterPtr->PushIndexOnly(key, memRec, nullptr, nullptr)) {
Finish(gen);
WriterPtr = std::make_unique<TWriterBlock>(TestCtx.GetVCtx(), EWriterDataType::Fresh, ChunksToUse,
Owner, OwnerRound, ChunkSize, AppendBlockSize, WriteBlockSize, 0, false, ReservedChunks, Arena,
true);
- pushRes = WriterPtr->Push(key, memRec, merger.GetDataMerger());
- Y_ABORT_UNLESS(pushRes);
+ Y_ABORT_UNLESS(WriterPtr->PushIndexOnly(key, merger.GetMemRec(), nullptr, nullptr));
}
while (auto msg = WriterPtr->GetPendingMessage()) {
Apply(msg);
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp
index e7063f4ebf1..feb929a6291 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.cpp
@@ -200,7 +200,7 @@ namespace NKikimr {
DIV_CLASS("col-md-1") {
TAG(TH5) { str << "Fresh"; }
const char *comp = Fresh.CompactionInProgress() ? "Compaction In Progress" : "No Compaction";
- H6_CLASS ("text-info") {str << comp << " W:" << FreshCompWritesInFlight;}
+ H6_CLASS ("text-info") {str << comp << " R:" << FreshCompReadsInFlight << "/W:" << FreshCompWritesInFlight;}
}
DIV_CLASS("col-md-11") {Fresh.OutputHtml(str);}
}
@@ -220,6 +220,7 @@ namespace NKikimr {
void TLevelIndex<TKey, TMemRec>::OutputProto(NKikimrVDisk::LevelIndexStat *stat) const {
NKikimrVDisk::FreshStat *fresh = stat->mutable_fresh();
fresh->set_compaction_writes_in_flight(FreshCompWritesInFlight);
+ fresh->set_compaction_reads_in_flight(FreshCompReadsInFlight);
Fresh.OutputProto(fresh);
NKikimrVDisk::SliceStat *slice = stat->mutable_slice();
slice->set_compaction_writes_in_flight(HullCompWritesInFlight);
diff --git a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h
index 2261ffe3fb5..79d5c854d32 100644
--- a/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h
+++ b/ydb/core/blobstorage/vdisk/hulldb/generic/hullds_idx.h
@@ -183,6 +183,7 @@ namespace NKikimr {
ui64 CurEntryPointLsn = ui64(-1);
ui64 PrevEntryPointLsn = ui64(-1);
+ TAtomic FreshCompReadsInFlight = 0;
TAtomic FreshCompWritesInFlight = 0;
TAtomic HullCompReadsInFlight = 0;
TAtomic HullCompWritesInFlight = 0;
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
index 1ab46aaff69..8268554c68f 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
@@ -37,23 +37,30 @@ namespace NKikimr {
const bool BarrierValidation;
TDelayedResponses DelayedResponses;
bool AllowGarbageCollection = false;
+ THugeBlobCtxPtr HugeBlobCtx;
+ ui32 MinREALHugeBlobInBytes;
TFields(TIntrusivePtr<THullDs> hullDs,
TIntrusivePtr<TLsnMngr> &&lsnMngr,
TPDiskCtxPtr &&pdiskCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
const TActorId skeletonId,
bool runHandoff,
TActorSystem *as,
- bool barrierValidation)
+ bool barrierValidation,
+ TActorId hugeKeeperId)
: LogoBlobsRunTimeCtx(std::make_shared<TLogoBlobsRunTimeCtx>(lsnMngr, pdiskCtx,
- skeletonId, runHandoff, hullDs->LogoBlobs))
+ skeletonId, runHandoff, hullDs->LogoBlobs, hugeKeeperId))
, BlocksRunTimeCtx(std::make_shared<TBlocksRunTimeCtx>(lsnMngr, pdiskCtx,
- skeletonId, runHandoff, hullDs->Blocks))
+ skeletonId, runHandoff, hullDs->Blocks, TActorId()))
, BarriersRunTimeCtx(std::make_shared<TBarriersRunTimeCtx>(lsnMngr, pdiskCtx,
- skeletonId, runHandoff, hullDs->Barriers))
+ skeletonId, runHandoff, hullDs->Barriers, TActorId()))
, LsnMngr(std::move(lsnMngr))
, ActorSystem(as)
, BarrierValidation(barrierValidation)
+ , HugeBlobCtx(std::move(hugeBlobCtx))
+ , MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
{}
void CutRecoveryLog(const TActorContext &ctx, std::unique_ptr<NPDisk::TEvCutLog> msg) {
@@ -75,14 +82,17 @@ namespace NKikimr {
THull::THull(
TIntrusivePtr<TLsnMngr> lsnMngr,
TPDiskCtxPtr pdiskCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
const TActorId skeletonId,
bool runHandoff,
THullDbRecovery &&uncond,
TActorSystem *as,
- bool barrierValidation)
+ bool barrierValidation,
+ TActorId hugeKeeperId)
: THullDbRecovery(std::move(uncond))
- , Fields(std::make_unique<TFields>(HullDs, std::move(lsnMngr), std::move(pdiskCtx),
- skeletonId, runHandoff, as, barrierValidation))
+ , Fields(std::make_unique<TFields>(HullDs, std::move(lsnMngr), std::move(pdiskCtx), std::move(hugeBlobCtx),
+ minREALHugeBlobInBytes, skeletonId, runHandoff, as, barrierValidation, hugeKeeperId))
{}
THull::~THull() = default;
@@ -119,8 +129,8 @@ namespace NKikimr {
activeActors.Insert(logNotifierAid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
Fields->SetLogNotifierActorId(logNotifierAid);
// actor for LogoBlobs DB
- HullDs->LogoBlobs->LIActor = ctx.RegisterWithSameMailbox(CreateLogoBlobsActor(config, HullDs, hullLogCtx, loggerId,
- Fields->LogoBlobsRunTimeCtx, syncLogFirstLsnToKeep));
+ HullDs->LogoBlobs->LIActor = ctx.RegisterWithSameMailbox(CreateLogoBlobsActor(config, HullDs, hullLogCtx,
+ Fields->HugeBlobCtx, Fields->MinREALHugeBlobInBytes, loggerId, Fields->LogoBlobsRunTimeCtx, syncLogFirstLsnToKeep));
activeActors.Insert(HullDs->LogoBlobs->LIActor, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
// actor for Blocks DB
HullDs->Blocks->LIActor = ctx.RegisterWithSameMailbox(CreateBlocksActor(config, HullDs, hullLogCtx, loggerId,
@@ -206,8 +216,6 @@ namespace NKikimr {
ReplayAddLogoBlobCmd(ctx, id, partId, ingress, std::move(buffer), lsn, THullDbRecovery::NORMAL);
// run compaction if required
- CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
}
void THull::AddHugeLogoBlob(
@@ -220,8 +228,7 @@ namespace NKikimr {
ReplayAddHugeLogoBlobCmd(ctx, id, ingress, diskAddr, lsn, THullDbRecovery::NORMAL);
// run compaction if required
- CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
+ CompactFreshLogoBlobsIfRequired(ctx);
}
void THull::AddLogoBlob(
@@ -234,8 +241,7 @@ namespace NKikimr {
ReplayAddLogoBlobCmd(ctx, id, ingress, seg.Point(), THullDbRecovery::NORMAL);
// run compaction if required
- CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
+ CompactFreshLogoBlobsIfRequired(ctx);
}
void THull::AddBulkSst(
@@ -280,7 +286,7 @@ namespace NKikimr {
Fields->DelayedResponses.ConfirmLsn(lsn, replySender);
// run compaction if required
- CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, Fields->BlocksRunTimeCtx, ctx, false,
+ CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, nullptr, 0, Fields->BlocksRunTimeCtx, ctx, false,
Fields->AllowGarbageCollection);
}
@@ -474,10 +480,9 @@ namespace NKikimr {
ReplayAddGCCmd(ctx, record, ingress, seg.Last);
// run compaction if required
- CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
- CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, Fields->BarriersRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
+ CompactFreshLogoBlobsIfRequired(ctx);
+ CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, nullptr, 0, Fields->BarriersRunTimeCtx, ctx,
+ false, Fields->AllowGarbageCollection);
}
void THull::CollectPhantoms(
@@ -615,12 +620,11 @@ namespace NKikimr {
Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1);
// run compaction if required
- CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->LogoBlobsRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
- CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, Fields->BlocksRunTimeCtx, ctx, false,
- Fields->AllowGarbageCollection);
- CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, Fields->BarriersRunTimeCtx, ctx, false,
+ CompactFreshLogoBlobsIfRequired(ctx);
+ CompactFreshSegmentIfRequired<TKeyBlock, TMemRecBlock>(HullDs, nullptr, 0, Fields->BlocksRunTimeCtx, ctx, false,
Fields->AllowGarbageCollection);
+ CompactFreshSegmentIfRequired<TKeyBarrier, TMemRecBarrier>(HullDs, nullptr, 0, Fields->BarriersRunTimeCtx, ctx,
+ false, Fields->AllowGarbageCollection);
}
// run fresh segment or fresh appendix compaction if required
@@ -633,7 +637,7 @@ namespace NKikimr {
bool allowGarbageCollection)
{
// try to start fresh compaction
- bool freshSegmentCompaction = CompactFreshSegmentIfRequired<TKey, TMemRec>(hullDs, rtCtx, ctx, false,
+ bool freshSegmentCompaction = CompactFreshSegmentIfRequired<TKey, TMemRec>(hullDs, nullptr, 0, rtCtx, ctx, false,
allowGarbageCollection);
if (!freshSegmentCompaction) {
// if not, try to start appendix compaction
@@ -707,4 +711,14 @@ namespace NKikimr {
ctx.Send(HullDs->Barriers->LIActor, new TEvPermitGarbageCollection);
}
+ void THull::ApplyHugeBlobSize(ui32 minREALHugeBlobInBytes, const TActorContext& ctx) {
+ Fields->MinREALHugeBlobInBytes = minREALHugeBlobInBytes;
+ ctx.Send(HullDs->LogoBlobs->LIActor, new TEvMinHugeBlobSizeUpdate(minREALHugeBlobInBytes));
+ }
+
+ void THull::CompactFreshLogoBlobsIfRequired(const TActorContext& ctx) {
+ CompactFreshSegmentIfRequired<TKeyLogoBlob, TMemRecLogoBlob>(HullDs, Fields->HugeBlobCtx,
+ Fields->MinREALHugeBlobInBytes, Fields->LogoBlobsRunTimeCtx, ctx, false, Fields->AllowGarbageCollection);
+ }
+
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
index 91bbd872507..9d43370da57 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
@@ -1,6 +1,7 @@
#pragma once
#include "defs.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_hulllogctx.h>
+#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h>
#include <ydb/core/blobstorage/vdisk/hulldb/cache_block/cache_block.h>
#include <ydb/core/blobstorage/vdisk/hulldb/recovery/hulldb_recovery.h>
#include <ydb/core/blobstorage/vdisk/hulldb/bulksst_add/hulldb_bulksst_add.h>
@@ -60,11 +61,14 @@ namespace NKikimr {
THull(
TIntrusivePtr<TLsnMngr> lsnMngr,
TPDiskCtxPtr pdiskCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
const TActorId skeletonId,
bool runHandoff,
THullDbRecovery &&uncond,
TActorSystem *as,
- bool barrierValidation);
+ bool barrierValidation,
+ TActorId hugeKeeperId);
THull(const THull &) = delete;
THull(THull &&) = default;
THull &operator =(const THull &) = delete;
@@ -210,6 +214,10 @@ namespace NKikimr {
}
void PermitGarbageCollection(const TActorContext& ctx);
+
+ void ApplyHugeBlobSize(ui32 minREALHugeBlobInBytes, const TActorContext& ctx);
+
+ void CompactFreshLogoBlobsIfRequired(const TActorContext& ctx);
};
// FIXME:
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp
index ca5c5626a30..1edb7cc955e 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp
@@ -77,6 +77,8 @@ namespace NKikimr {
template <class TKey, class TMemRec>
void CompactFreshSegment(
TIntrusivePtr<THullDs> &hullDs,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx,
const TActorContext &ctx,
bool allowGarbageCollection)
@@ -105,8 +107,9 @@ namespace NKikimr {
ui64 firstLsn = freshSegment->GetFirstLsn();
ui64 lastLsn = freshSegment->GetLastLsn();
std::unique_ptr<TFreshCompaction> compaction(new TFreshCompaction(
- hullCtx, rtCtx, freshSegment, freshSegmentSnap, std::move(barriersSnap), std::move(levelSnap),
- mergeElementsApproximation, it, firstLsn, lastLsn, TDuration::Max(), {}, allowGarbageCollection));
+ hullCtx, rtCtx, std::move(hugeBlobCtx), minREALHugeBlobInBytes, freshSegment, freshSegmentSnap,
+ std::move(barriersSnap), std::move(levelSnap), mergeElementsApproximation, it, firstLsn, lastLsn,
+ TDuration::Max(), {}, allowGarbageCollection));
LOG_INFO(ctx, NKikimrServices::BS_HULLCOMP,
VDISKP(hullCtx->VCtx->VDiskLogPrefix,
@@ -172,6 +175,8 @@ namespace NKikimr {
bool CompactionScheduled = false;
TInstant NextCompactionWakeup;
bool AllowGarbageCollection = false;
+ THugeBlobCtxPtr HugeBlobCtx;
+ ui32 MinREALHugeBlobInBytes;
friend class TActorBootstrapped<TThis>;
@@ -228,8 +233,8 @@ namespace NKikimr {
void ScheduleCompaction(const TActorContext &ctx) {
// schedule fresh if required
- CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx, FullCompactionState.ForceFreshCompaction(RTCtx),
- AllowGarbageCollection);
+ CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, MinREALHugeBlobInBytes, RTCtx, ctx,
+ FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection);
if (!Config->BaseInfo.ReadOnly && !RunLevelCompactionSelector(ctx)) {
ScheduleCompactionWakeup(ctx);
}
@@ -255,10 +260,9 @@ namespace NKikimr {
TLevelSliceForwardIterator it(HullDs->HullCtx, vec);
it.SeekToFirst();
- std::unique_ptr<TLevelCompaction> compaction(new TLevelCompaction(
- HullDs->HullCtx, RTCtx, nullptr, nullptr, std::move(barriersSnap), std::move(levelSnap),
- mergeElementsApproximation, it, firstLsn, lastLsn, TDuration::Minutes(2), {},
- AllowGarbageCollection));
+ std::unique_ptr<TLevelCompaction> compaction(new TLevelCompaction(HullDs->HullCtx, RTCtx, HugeBlobCtx,
+ MinREALHugeBlobInBytes, nullptr, nullptr, std::move(barriersSnap), std::move(levelSnap),
+ mergeElementsApproximation, it, firstLsn, lastLsn, TDuration::Minutes(2), {}, AllowGarbageCollection));
NActors::TActorId actorId = RunInBatchPool(ctx, compaction.release());
ActiveActors.Insert(actorId, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
}
@@ -417,8 +421,10 @@ namespace NKikimr {
// run level committer
TDiskPartVec removedHugeBlobs(CompactionTask->ExtractHugeBlobsToDelete());
+ TDiskPartVec allocatedHugeBlobs(CompactionTask->GetHugeBlobsAllocated());
auto committer = std::make_unique<TAsyncLevelCommitter>(HullLogCtx, HullDbCommitterCtx, RTCtx->LevelIndex,
- ctx.SelfID, std::move(chunksAdded), std::move(deleteChunks), std::move(removedHugeBlobs), wId);
+ ctx.SelfID, std::move(chunksAdded), std::move(deleteChunks), std::move(removedHugeBlobs),
+ std::move(allocatedHugeBlobs), wId);
TActorId committerID = ctx.RegisterWithSameMailbox(committer.release());
ActiveActors.Insert(committerID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
@@ -493,14 +499,14 @@ namespace NKikimr {
// run fresh committer
auto committer = std::make_unique<TAsyncFreshCommitter>(HullLogCtx, HullDbCommitterCtx, RTCtx->LevelIndex,
ctx.SelfID, std::move(msg->CommitChunks), std::move(msg->ReservedChunks),
- std::move(msg->FreedHugeBlobs), dbg.Str(), wId);
+ std::move(msg->FreedHugeBlobs), std::move(msg->AllocatedHugeBlobs), dbg.Str(), wId);
auto aid = ctx.RegisterWithSameMailbox(committer.release());
ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
} else {
Y_ABORT_UNLESS(RTCtx->LevelIndex->GetCompState() == TLevelIndexBase::StateCompInProgress);
CompactionTask->CompactSsts.CompactionFinished(std::move(msg->SegVec),
- std::move(msg->FreedHugeBlobs), msg->Aborted);
+ std::move(msg->FreedHugeBlobs), std::move(msg->AllocatedHugeBlobs), msg->Aborted);
if (msg->Aborted) { // if the compaction was aborted, ensure there was no index change
Y_ABORT_UNLESS(CompactionTask->GetSstsToAdd().Empty());
@@ -565,8 +571,8 @@ namespace NKikimr {
if (FullCompactionState.Enabled()) {
ScheduleCompaction(ctx);
} else {
- CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx,
- FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection);
+ CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, MinREALHugeBlobInBytes, RTCtx,
+ ctx, FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection);
}
break;
case THullCommitFinished::CommitAdvanceLsn:
@@ -589,8 +595,9 @@ namespace NKikimr {
const ui64 freeUpToLsn = ev->Get()->FreeUpToLsn;
RTCtx->SetFreeUpToLsn(freeUpToLsn);
// we check if we need to start fresh compaction, FreeUpToLsn influence our decision
- const bool freshCompStarted = CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, RTCtx, ctx,
- FullCompactionState.ForceFreshCompaction(RTCtx), AllowGarbageCollection);
+ const bool freshCompStarted = CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx,
+ MinREALHugeBlobInBytes, RTCtx, ctx, FullCompactionState.ForceFreshCompaction(RTCtx),
+ AllowGarbageCollection);
// just for valid info output to the log
bool moveEntryPointStarted = false;
if (!freshCompStarted && !AdvanceCommitInProgress) {
@@ -648,6 +655,8 @@ namespace NKikimr {
case E::FRESH_ONLY:
Y_ABORT_UNLESS(FreshOnlyCompactQ.empty() || FreshOnlyCompactQ.back().first <= confirmedLsn);
FreshOnlyCompactQ.emplace_back(confirmedLsn, ev);
+ CompactFreshSegmentIfRequired<TKey, TMemRec>(HullDs, HugeBlobCtx, MinREALHugeBlobInBytes, RTCtx, ctx,
+ true, AllowGarbageCollection); // ask for forced fresh compaction
break;
}
@@ -676,6 +685,10 @@ namespace NKikimr {
AllowGarbageCollection = true;
}
+ void Handle(TEvMinHugeBlobSizeUpdate::TPtr ev, const TActorContext& /*ctx*/) {
+ MinREALHugeBlobInBytes = ev->Get()->MinREALHugeBlobInBytes;
+ }
+
STRICT_STFUNC(StateFunc,
HFunc(THullCommitFinished, Handle)
HFunc(NPDisk::TEvCutLog, Handle)
@@ -688,6 +701,7 @@ namespace NKikimr {
HFunc(TEvents::TEvPoisonPill, HandlePoison)
CFunc(TEvBlobStorage::EvPermitGarbageCollection, HandlePermitGarbageCollection)
HFunc(TEvHugePreCompactResult, Handle)
+ HFunc(TEvMinHugeBlobSizeUpdate, Handle)
)
public:
@@ -699,6 +713,8 @@ namespace NKikimr {
TIntrusivePtr<TVDiskConfig> config,
TIntrusivePtr<THullDs> hullDs,
std::shared_ptr<THullLogCtx> hullLogCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TActorId loggerId,
std::shared_ptr<TRunTimeCtx> rtCtx,
std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep)
@@ -721,6 +737,8 @@ namespace NKikimr {
, CompactionTask(new TCompactionTask)
, ActiveActors(RTCtx->LevelIndex->ActorCtx->ActiveActors)
, LevelStat(HullDs->HullCtx->VCtx->VDiskCounters)
+ , HugeBlobCtx(std::move(hugeBlobCtx))
+ , MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
{}
};
@@ -728,12 +746,13 @@ namespace NKikimr {
TIntrusivePtr<TVDiskConfig> config,
TIntrusivePtr<THullDs> hullDs,
std::shared_ptr<THullLogCtx> hullLogCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TActorId loggerId,
std::shared_ptr<TLevelIndexRunTimeCtx<TKeyLogoBlob, TMemRecLogoBlob>> rtCtx,
std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) {
-
- return new TLevelIndexActor<TKeyLogoBlob, TMemRecLogoBlob>(
- config, hullDs, hullLogCtx, loggerId, rtCtx, syncLogFirstLsnToKeep);
+ return new TLevelIndexActor<TKeyLogoBlob, TMemRecLogoBlob>(config, hullDs, hullLogCtx, std::move(hugeBlobCtx),
+ minREALHugeBlobInBytes, loggerId, rtCtx,syncLogFirstLsnToKeep);
}
NActors::IActor* CreateBlocksActor(
@@ -743,9 +762,8 @@ namespace NKikimr {
TActorId loggerId,
std::shared_ptr<TLevelIndexRunTimeCtx<TKeyBlock, TMemRecBlock>> rtCtx,
std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) {
-
- return new TLevelIndexActor<TKeyBlock, TMemRecBlock>(
- config, hullDs, hullLogCtx, loggerId, rtCtx, syncLogFirstLsnToKeep);
+ return new TLevelIndexActor<TKeyBlock, TMemRecBlock>(config, hullDs, hullLogCtx, nullptr, 0, loggerId, rtCtx,
+ syncLogFirstLsnToKeep);
}
NActors::IActor* CreateBarriersActor(
@@ -755,8 +773,7 @@ namespace NKikimr {
TActorId loggerId,
std::shared_ptr<TLevelIndexRunTimeCtx<TKeyBarrier, TMemRecBarrier>> rtCtx,
std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep) {
-
- return new TLevelIndexActor<TKeyBarrier, TMemRecBarrier>(
- config, hullDs, hullLogCtx, loggerId, rtCtx, syncLogFirstLsnToKeep);
+ return new TLevelIndexActor<TKeyBarrier, TMemRecBarrier>(config, hullDs, hullLogCtx, nullptr, 0, loggerId, rtCtx,
+ syncLogFirstLsnToKeep);
}
}
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h
index 4a9b773f4b2..e740a9f47e0 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all.h>
+#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h>
namespace NKikimr {
@@ -23,6 +24,7 @@ namespace NKikimr {
const bool RunHandoff;
const TIntrusivePtr<TLevelIndex<TKey, TMemRec>> LevelIndex;
ui64 FreeUpToLsn = 0;
+ const TActorId HugeKeeperId;
private:
// ActorId of the LogCutterNotifier, that aggregates cut log lsn for the Hull database
TActorId LogNotifierActorId;
@@ -32,12 +34,14 @@ namespace NKikimr {
TPDiskCtxPtr pdiskCtx,
const TActorId skeletonId,
bool runHandoff,
- TIntrusivePtr<TLevelIndex<TKey, TMemRec>> levelIndex)
+ TIntrusivePtr<TLevelIndex<TKey, TMemRec>> levelIndex,
+ const TActorId hugeKeeperId)
: LsnMngr(std::move(lsnMngr))
, PDiskCtx(std::move(pdiskCtx))
, SkeletonId(skeletonId)
, RunHandoff(runHandoff)
, LevelIndex(std::move(levelIndex))
+ , HugeKeeperId(hugeKeeperId)
{
Y_ABORT_UNLESS(LsnMngr && PDiskCtx && LevelIndex);
}
@@ -79,6 +83,8 @@ namespace NKikimr {
template <class TKey, class TMemRec>
void CompactFreshSegment(
TIntrusivePtr<THullDs> &hullDs,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx,
const TActorContext &ctx,
bool allowGarbageCollection);
@@ -86,6 +92,8 @@ namespace NKikimr {
template <class TKey, class TMemRec>
bool CompactFreshSegmentIfRequired(
TIntrusivePtr<THullDs> &hullDs,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
std::shared_ptr<TLevelIndexRunTimeCtx<TKey, TMemRec>> &rtCtx,
const TActorContext &ctx,
bool force,
@@ -94,7 +102,8 @@ namespace NKikimr {
ui64 yardFreeUpToLsn = rtCtx->GetFreeUpToLsn();
bool compact = hullDs->HullCtx->FreshCompaction && rtCtx->LevelIndex->NeedsFreshCompaction(yardFreeUpToLsn, force);
if (compact) {
- CompactFreshSegment<TKey, TMemRec>(hullDs, rtCtx, ctx, allowGarbageCollection);
+ CompactFreshSegment<TKey, TMemRec>(hullDs, std::move(hugeBlobCtx), minREALHugeBlobInBytes, rtCtx, ctx,
+ allowGarbageCollection);
}
return compact;
}
@@ -106,6 +115,8 @@ namespace NKikimr {
TIntrusivePtr<TVDiskConfig> config,
TIntrusivePtr<THullDs> hullDs,
std::shared_ptr<THullLogCtx> hullLogCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TActorId loggerId,
std::shared_ptr<TLevelIndexRunTimeCtx<TKeyLogoBlob, TMemRecLogoBlob>> rtCtx,
std::shared_ptr<NSyncLog::TSyncLogFirstLsnToKeep> syncLogFirstLsnToKeep);
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h
index d6ab9b6dabc..0abc2be976e 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h
@@ -62,6 +62,7 @@ namespace NKikimr {
TVector<ui32> CommitChunks; // chunks to commit within this log entry
TVector<ui32> DeleteChunks; // chunks to delete
TDiskPartVec RemovedHugeBlobs; // freed huge blobs
+ TDiskPartVec AllocatedHugeBlobs;
TLevelSegmentPtr ReplSst; // pointer to replicated SST
ui32 NumRecoveredBlobs; // number of blobs in this SST (valid only for replicated tables)
bool DeleteToDecommitted;
@@ -70,10 +71,12 @@ namespace NKikimr {
THullCommitMeta(TVector<ui32>&& chunksAdded,
TVector<ui32>&& chunksDeleted,
TDiskPartVec&& removedHugeBlobs,
+ TDiskPartVec&& allocatedHugeBlobs,
bool prevSliceActive)
: CommitChunks(std::move(chunksAdded))
, DeleteChunks(std::move(chunksDeleted))
, RemovedHugeBlobs(std::move(removedHugeBlobs))
+ , AllocatedHugeBlobs(std::move(allocatedHugeBlobs))
, NumRecoveredBlobs(0)
, DeleteToDecommitted(prevSliceActive)
{}
@@ -140,9 +143,9 @@ namespace NKikimr {
// notify delayed deleter when log record is actually written; we MUST ensure that updates are coming in
// order of increasing LSN's; this is achieved automatically as all actors reside on the same mailbox
LevelIndex->DelayedCompactionDeleterInfo->Update(LsnSeg.Last, std::move(Metadata.RemovedHugeBlobs),
- CommitRecord.DeleteToDecommitted ? CommitRecord.DeleteChunks : TVector<TChunkIdx>(),
- PDiskSignatureForHullDbKey<TKey>(), WId, ctx, Ctx->HugeKeeperId, Ctx->SkeletonId, Ctx->PDiskCtx,
- Ctx->HullCtx->VCtx);
+ std::move(Metadata.AllocatedHugeBlobs), CommitRecord.DeleteToDecommitted ? CommitRecord.DeleteChunks :
+ TVector<TChunkIdx>(), PDiskSignatureForHullDbKey<TKey>(), WId, ctx, Ctx->HugeKeeperId, Ctx->SkeletonId,
+ Ctx->PDiskCtx, Ctx->HullCtx->VCtx);
NPDisk::TEvLogResult* msg = ev->Get();
@@ -223,6 +226,7 @@ namespace NKikimr {
NKikimrVDiskData::THullDbEntryPoint pb;
LevelIndex->SerializeToProto(*pb.MutableLevelIndex());
Metadata.RemovedHugeBlobs.SerializeToProto(*pb.MutableRemovedHugeBlobs());
+ Metadata.AllocatedHugeBlobs.SerializeToProto(*pb.MutableAllocatedHugeBlobs());
return THullDbSignatureRoutines::Serialize(pb);
}
@@ -323,7 +327,7 @@ namespace NKikimr {
std::move(levelIndex),
notifyID,
TActorId(),
- {TVector<ui32>(), TVector<ui32>(), TDiskPartVec(), false},
+ {TVector<ui32>(), TVector<ui32>(), TDiskPartVec(), TDiskPartVec(), false},
callerInfo,
0)
{}
@@ -351,6 +355,7 @@ namespace NKikimr {
TVector<ui32>&& chunksAdded,
TVector<ui32>&& chunksDeleted,
TDiskPartVec&& removedHugeBlobs,
+ TDiskPartVec&& allocatedHugeBlobs,
const TString &callerInfo,
ui64 wId)
: TBase(std::move(hullLogCtx),
@@ -358,7 +363,7 @@ namespace NKikimr {
std::move(levelIndex),
notifyID,
TActorId(),
- {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), false},
+ {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), std::move(allocatedHugeBlobs), false},
callerInfo,
wId)
{}
@@ -383,13 +388,14 @@ namespace NKikimr {
TVector<ui32>&& chunksAdded,
TVector<ui32>&& chunksDeleted,
TDiskPartVec&& removedHugeBlobs,
+ TDiskPartVec&& allocatedHugeBlobs,
ui64 wId)
: TBase(std::move(hullLogCtx),
std::move(ctx),
std::move(levelIndex),
notifyID,
TActorId(),
- {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), true},
+ {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), std::move(allocatedHugeBlobs), true},
TString(),
wId)
{}
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h
index c9e7af00039..73ffcdc8c39 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h
@@ -30,6 +30,7 @@ namespace NKikimr {
TIntrusivePtr<TFreshSegment> FreshSegment;
// huge blobs to delete after compaction
TDiskPartVec FreedHugeBlobs;
+ TDiskPartVec AllocatedHugeBlobs;
// was the compaction process aborted by some reason?
bool Aborted = false;
@@ -85,7 +86,8 @@ namespace NKikimr {
// messages we have to send to Yard
TVector<std::unique_ptr<IEventBase>> MsgsForYard;
- TActorId SkeletonId;
+ const TActorId SkeletonId;
+ const TActorId HugeKeeperId;
bool IsAborting = false;
ui32 PendingResponses = 0;
@@ -140,13 +142,18 @@ namespace NKikimr {
// there are events, we send them to yard; worker internally controls all in flight limits and does not
// generate more events than allowed; this function returns boolean status indicating whether compaction job
// is finished or not
- const bool done = Worker.MainCycle(MsgsForYard);
+ std::vector<ui32> *slotsToAllocate = nullptr;
+ const bool done = Worker.MainCycle(MsgsForYard, &slotsToAllocate);
// check if there are messages we have for yard
for (std::unique_ptr<IEventBase>& msg : MsgsForYard) {
ctx.Send(PDiskCtx->PDiskId, msg.release());
++PendingResponses;
}
MsgsForYard.clear();
+ // send slots to allocate to huge keeper, if any
+ if (slotsToAllocate) {
+ ctx.Send(HugeKeeperId, new TEvHugeAllocateSlots(std::move(*slotsToAllocate)));
+ }
// when done, continue with other state
if (done) {
Finalize(ctx);
@@ -228,11 +235,17 @@ namespace NKikimr {
MainCycle(ctx);
}
+ void Handle(TEvHugeAllocateSlotsResult::TPtr ev, const TActorContext& ctx) {
+ Worker.Apply(ev->Get());
+ MainCycle(ctx);
+ }
+
STRICT_STFUNC(WorkFunc,
HFunc(NPDisk::TEvChunkReserveResult, HandleYardResponse)
HFunc(NPDisk::TEvChunkWriteResult, HandleYardResponse)
HFunc(NPDisk::TEvChunkReadResult, HandleYardResponse)
HFunc(TEvRestoreCorruptedBlobResult, Handle)
+ HFunc(TEvHugeAllocateSlotsResult, Handle)
HFunc(TEvents::TEvPoisonPill, HandlePoison)
)
///////////////////////// WORK: END /////////////////////////////////////////////////
@@ -264,6 +277,11 @@ namespace NKikimr {
PDiskSignatureForHullDbKey<TKey>().ToString().data(), CompactionID,
(FreshSegment ? "true" : "false"), Worker.GetFreedHugeBlobs().ToString().data()));
msg->FreedHugeBlobs = IsAborting ? TDiskPartVec() : Worker.GetFreedHugeBlobs();
+ msg->AllocatedHugeBlobs = IsAborting ? TDiskPartVec() : Worker.GetAllocatedHugeBlobs();
+
+ if (IsAborting) { // release previously preallocated slots for huge blobs if we are aborting
+ ctx.Send(HugeKeeperId, new TEvHugeDropAllocatedSlots(Worker.GetAllocatedHugeBlobs().Vec));
+ }
// chunks to commit
msg->CommitChunks = IsAborting ? TVector<ui32>() : Worker.GetCommitChunks();
@@ -304,6 +322,8 @@ namespace NKikimr {
THullCompaction(THullCtxPtr hullCtx,
const std::shared_ptr<TLevelIndexRunTimeCtx> &rtCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TIntrusivePtr<TFreshSegment> freshSegment,
std::shared_ptr<TFreshSegmentSnapshot> freshSegmentSnap,
TBarriersSnapshot &&barriersSnap,
@@ -326,10 +346,11 @@ namespace NKikimr {
, Hmp(CreateHandoffMap<TKey, TMemRec>(HullCtx, rtCtx->RunHandoff, rtCtx->SkeletonId))
, Gcmp(CreateGcMap<TKey, TMemRec>(HullCtx, mergeElementsApproximation, allowGarbageCollection))
, It(it)
- , Worker(HullCtx, PDiskCtx, rtCtx->LevelIndex, it, (bool)FreshSegment, firstLsn, lastLsn, restoreDeadline,
- partitionKey)
+ , Worker(HullCtx, PDiskCtx, std::move(hugeBlobCtx), minREALHugeBlobInBytes, rtCtx->LevelIndex, it,
+ static_cast<bool>(FreshSegment), firstLsn, lastLsn, restoreDeadline, partitionKey)
, CompactionID(TAppData::RandomProvider->GenRand64())
, SkeletonId(rtCtx->SkeletonId)
+ , HugeKeeperId(rtCtx->HugeKeeperId)
{}
};
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h
index 2c8aed0c3d8..72cc57b617c 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue.h
@@ -15,18 +15,18 @@ namespace NKikimr {
ui64 Id;
ui32 NumReads;
TDiskPart PreallocatedLocation;
- TDiskBlobMergerWithMask Merger;
- NMatrix::TVectorType PartsToStore;
+ TDiskBlobMerger Merger;
TLogoBlobID BlobId;
+ bool IsInline;
TItem(ui64 id, ui32 numReads, const TDiskPart& preallocatedLocation, const TDiskBlobMerger& merger,
- NMatrix::TVectorType partsToStore, const TLogoBlobID& blobId)
+ const TLogoBlobID& blobId, bool isInline)
: Id(id)
, NumReads(numReads)
, PreallocatedLocation(preallocatedLocation)
- , Merger(merger, partsToStore)
- , PartsToStore(partsToStore)
+ , Merger(merger)
, BlobId(blobId)
+ , IsInline(isInline)
{}
};
@@ -57,12 +57,12 @@ namespace NKikimr {
ProcessItemQueue();
}
- void AddReadDiskBlob(ui64 id, TRope&& buffer, NMatrix::TVectorType expectedParts) {
+ void AddReadDiskBlob(ui64 id, TRope&& buffer, ui8 partIdx) {
Y_ABORT_UNLESS(Started);
Y_ABORT_UNLESS(ItemQueue);
TItem& item = ItemQueue.front();
Y_ABORT_UNLESS(item.Id == id);
- item.Merger.Add(TDiskBlob(&buffer, expectedParts, GType, item.BlobId));
+ item.Merger.AddPart(std::move(buffer), GType, TLogoBlobID(item.BlobId, partIdx + 1));
Y_ABORT_UNLESS(item.NumReads > 0);
if (!--item.NumReads) {
ProcessItemQueue();
@@ -89,12 +89,9 @@ namespace NKikimr {
}
void ProcessItem(TItem& item) {
- // ensure that we have all the parts we must have
- Y_ABORT_UNLESS(item.Merger.GetDiskBlob().GetParts() == item.PartsToStore);
-
// get newly generated blob raw content and put it into writer queue
static_cast<TDerived&>(*this).ProcessItemImpl(item.PreallocatedLocation, item.Merger.CreateDiskBlob(Arena,
- AddHeader));
+ AddHeader), item.IsInline);
}
};
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp
index aa24409982a..4f278eb1f5a 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactdeferredqueue_ut.cpp
@@ -8,7 +8,7 @@
using namespace NKikimr;
-const TBlobStorageGroupType GType(TBlobStorageGroupType::ErasureNone);
+const TBlobStorageGroupType GType(TBlobStorageGroupType::Erasure4Plus2Block);
std::unordered_map<TString, size_t> StringToId;
std::deque<TRope> IdToRope;
@@ -29,7 +29,7 @@ class TTestDeferredQueue : public TDeferredItemQueueBase<TTestDeferredQueue> {
void StartImpl() {
}
- void ProcessItemImpl(const TDiskPart& /*preallocatedLocation*/, const TRope& buffer) {
+ void ProcessItemImpl(const TDiskPart& /*preallocatedLocation*/, const TRope& buffer, bool /*isInline*/) {
Results.push_back(GetResMapId(buffer));
}
@@ -47,23 +47,16 @@ public:
Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) {
Y_UNIT_TEST(Basic) {
- TRope parts[6] = {
- TRope(TString("AAAAAA")),
- TRope(TString("BBBBBB")),
- TRope(TString("CCCCCC")),
- TRope(TString("DDDDDD")),
- TRope(TString("EEEEEE")),
- TRope(TString("FFFFFF")),
- };
-
- ui32 fullDataSize = 32;
+ TString data = "AAABBBCCCDDDEEEFFF";
+ TLogoBlobID blobId(0, 0, 0, 0, data.size(), 0);
+ std::array<TRope, 6> parts;
+ ErasureSplit(static_cast<TErasureType::ECrcMode>(blobId.CrcMode()), GType, TRope(data), parts);
std::unordered_map<TString, size_t> resm;
struct TItem {
ssize_t BlobId;
NMatrix::TVectorType BlobParts;
- NMatrix::TVectorType PartsToStore;
TVector<std::pair<size_t, NMatrix::TVectorType>> DiskData;
size_t Expected;
};
@@ -72,17 +65,15 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) {
TRopeArena arena(&TRopeArenaBackend::Allocate);
auto process = [&](ui32 mem, ui32 masks0, ui32 masks1, ui32 numDiskParts) {
- ui32 masks[3] = {masks0, masks1, 0};
+ ui32 masks[2] = {masks0, masks1};
TItem item;
TDiskBlobMerger expm;
// create initial memory merger
- for (ui8 i = 0; i < 6; ++i) {
+ for (ui8 i = 0; i < GType.TotalPartCount(); ++i) {
if (mem >> i & 1) {
- TRope buf = TDiskBlob::Create(fullDataSize, i + 1, 6, TRope(parts[i]), arena, true);
- TDiskBlob blob(&buf, NMatrix::TVectorType::MakeOneHot(i, 6), GType, TLogoBlobID(0, 0, 0, 0, parts[i].GetSize(), 0));
- expm.Add(blob);
+ expm.AddPart(TRope(parts[i]), GType, TLogoBlobID(blobId, i + 1));
}
}
@@ -95,47 +86,28 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) {
item.BlobParts = expm.GetDiskBlob().GetParts();
// generate disk parts
- ui64 wholeMask = mem;
for (ui32 i = 0; i < numDiskParts; ++i) {
const ui32 mask = masks[i];
Y_ABORT_UNLESS(mask);
TDiskBlobMerger m;
- for (ui8 i = 0; i < 6; ++i) {
+ for (ui8 i = 0; i < GType.TotalPartCount(); ++i) {
if (mask >> i & 1) {
- TRope buf = TDiskBlob::Create(fullDataSize, i + 1, 6, TRope(parts[i]), arena, true);
- TDiskBlob blob(&buf, NMatrix::TVectorType::MakeOneHot(i, 6), GType, TLogoBlobID(0, 0, 0, 0, parts[i].GetSize(), 0));
- m.Add(blob);
- expm.Add(blob);
+ m.AddPart(TRope(parts[i]), GType, TLogoBlobID(blobId, i + 1));
+ expm.AddPart(TRope(parts[i]), GType, TLogoBlobID(blobId, i + 1));
}
}
TRope buf = m.CreateDiskBlob(arena, true);
Y_ABORT_UNLESS(buf);
item.DiskData.emplace_back(GetResMapId(buf), m.GetDiskBlob().GetParts());
-
- wholeMask |= mask;
}
// generate parts to store vector and store item
- for (ui32 p = 1; p < 64; ++p) {
- if ((p & wholeMask) == p) {
- NMatrix::TVectorType v(0, 6);
- for (ui8 i = 0; i < 6; ++i) {
- if (p >> i & 1) {
- v.Set(i);
- }
- }
- item.PartsToStore = v;
-
- TDiskBlobMergerWithMask mx;
- mx.SetFilterMask(v);
- mx.Add(expm.GetDiskBlob());
- item.Expected = GetResMapId(mx.CreateDiskBlob(arena, true));
-
- items.push_back(item);
- }
+ if (!expm.Empty()) {
+ item.Expected = GetResMapId(expm.CreateDiskBlob(arena, true));
+ items.push_back(item);
}
};
@@ -173,22 +145,28 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) {
// prepare item merger for this sample item
TDiskBlobMerger merger;
if (item.BlobId != -1) {
- merger.Add(TDiskBlob(&IdToRope[item.BlobId], item.BlobParts, GType, TLogoBlobID(0, 0, 0, 0, 6, 0)));
+ merger.Add(TDiskBlob(&IdToRope[item.BlobId], item.BlobParts, GType, blobId));
}
// put the item to queue
- q.Put(id, item.DiskData.size(), TDiskPart(0, 0, 0), std::move(merger), item.PartsToStore, TLogoBlobID(0, 0, 0, 0, 6, 0));
- for (auto& p : item.DiskData) {
- itemQueue.emplace(id, IdToRope[p.first], p.second);
+ ui32 numReads = 0;
+ for (auto& [ropeId, parts] : item.DiskData) {
+ itemQueue.emplace(id, IdToRope[ropeId], parts);
+ numReads += parts.CountBits();
}
+ q.Put(id, numReads, TDiskPart(0, 0, 0), std::move(merger), blobId, true);
referenceResults.push_back(item.Expected);
++id;
}
q.Start();
while (itemQueue) {
- auto& front = itemQueue.front();
- q.AddReadDiskBlob(std::get<0>(front), std::move(std::get<1>(front)), std::get<2>(front));
+ auto& [id, buffer, parts] = itemQueue.front();
+ TDiskBlob blob(&buffer, parts, GType, blobId);
+ for (ui8 partIdx : parts) {
+ TRope holder;
+ q.AddReadDiskBlob(id, TRope(blob.GetPart(partIdx, &holder)), partIdx);
+ }
itemQueue.pop();
UNIT_ASSERT_VALUES_EQUAL(q.AllProcessed(), itemQueue.empty());
}
@@ -196,7 +174,7 @@ Y_UNIT_TEST_SUITE(TBlobStorageHullCompactDeferredQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(referenceResults.size(), q.Results.size());
for (ui32 i = 0; i < referenceResults.size(); ++i) {
- UNIT_ASSERT_EQUAL(referenceResults[i], q.Results[i]);
+ UNIT_ASSERT_VALUES_EQUAL(IdToRope[referenceResults[i]].ConvertToString(), IdToRope[q.Results[i]].ConvertToString());
}
}
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h
index b2bb2e1eec8..6fa069c7032 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompactworker.h
@@ -7,18 +7,20 @@
#include <ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullwritesst.h>
#include <ydb/core/blobstorage/vdisk/hulldb/blobstorage_hullgcmap.h>
#include <ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.h>
+#include <ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h>
namespace NKikimr {
template<typename TKey, typename TMemRec, typename TIterator>
class THullCompactionWorker {
+ static constexpr bool LogoBlobs = std::is_same_v<TKey, TKeyLogoBlob>;
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// COMMON TYPE ALIASES
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// handoff map
using THandoffMap = NKikimr::THandoffMap<TKey, TMemRec>;
- using TTransformedItem = typename THandoffMap::TTransformedItem;
using THandoffMapPtr = TIntrusivePtr<THandoffMap>;
// garbage collector map
@@ -41,26 +43,41 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class TDeferredItemQueue : public TDeferredItemQueueBase<TDeferredItemQueue> {
- TWriter *Writer = nullptr;
+ THullCompactionWorker *Worker = nullptr;
friend class TDeferredItemQueueBase<TDeferredItemQueue>;
- void StartImpl(TWriter *writer) {
- Y_ABORT_UNLESS(!Writer);
- Writer = writer;
- Y_ABORT_UNLESS(Writer);
+ void StartImpl(THullCompactionWorker *worker) {
+ Y_ABORT_UNLESS(!Worker);
+ Worker = worker;
+ Y_ABORT_UNLESS(Worker);
+ Y_ABORT_UNLESS(Worker->WriterPtr);
}
- void ProcessItemImpl(const TDiskPart& preallocatedLocation, TRope&& buffer) {
- TDiskPart writtenLocation = Writer->PushDataOnly(std::move(buffer));
+ void ProcessItemImpl(const TDiskPart& preallocatedLocation, TRope&& buffer, bool isInline) {
+ Y_DEBUG_ABORT_UNLESS(Worker);
- // ensure that item was written into preallocated position
- Y_ABORT_UNLESS(writtenLocation == preallocatedLocation);
+ if (isInline) {
+ const TDiskPart writtenLocation = Worker->WriterPtr->PushDataOnly(std::move(buffer));
+ Y_ABORT_UNLESS(writtenLocation == preallocatedLocation);
+ } else {
+ Y_ABORT_UNLESS(preallocatedLocation.Size == buffer.GetSize());
+ size_t fullSize = buffer.GetSize();
+ if (const size_t misalign = fullSize % Worker->PDiskCtx->Dsk->AppendBlockSize) {
+ fullSize += Worker->PDiskCtx->Dsk->AppendBlockSize - misalign;
+ }
+ auto partsPtr = MakeIntrusive<NPDisk::TEvChunkWrite::TRopeAlignedParts>(std::move(buffer), fullSize);
+ void *cookie = nullptr;
+ auto write = std::make_unique<NPDisk::TEvChunkWrite>(Worker->PDiskCtx->Dsk->Owner,
+ Worker->PDiskCtx->Dsk->OwnerRound, preallocatedLocation.ChunkIdx, preallocatedLocation.Offset,
+ partsPtr, cookie, true, NPriWrite::HullComp, false);
+ Worker->PendingWrites.push_back(std::move(write));
+ }
}
void FinishImpl() {
- Y_ABORT_UNLESS(Writer);
- Writer = nullptr;
+ Y_ABORT_UNLESS(Worker);
+ Worker = nullptr;
}
public:
@@ -77,6 +94,7 @@ namespace NKikimr {
enum class EState {
Invalid, // invalid state; this state should never be reached
GetNextItem, // going to extract next item for processing or finish if there are no more items
+ WaitForSlotAllocation, // waiting for slot allocation from huge keeper
TryProcessItem, // trying to write item into SST
WaitingForDeferredItems,
FlushingSST, // flushing SST to disk
@@ -97,6 +115,8 @@ namespace NKikimr {
// basic contexts
THullCtxPtr HullCtx;
TPDiskCtxPtr PDiskCtx;
+ THugeBlobCtxPtr HugeBlobCtx;
+ ui32 MinREALHugeBlobInBytes;
// Group Type
const TBlobStorageGroupType GType;
@@ -132,11 +152,12 @@ namespace NKikimr {
// record merger for compaction
TCompactRecordMerger IndexMerger;
- // current handoff-transformed item
- const TTransformedItem *TransformedItem = nullptr;
+ // current handoff-transformed MemRec
+ std::optional<TMemRec> MemRec;
// SST writer
std::unique_ptr<TWriter> WriterPtr;
+ bool WriterHasPendingOperations = false;
// number of chunks we have asked to reserve, but not yet confirmed
ui32 ChunkReservePending = 0;
@@ -154,10 +175,11 @@ namespace NKikimr {
ui32 InFlightReads = 0;
// maximum number of such requests
- ui32 MaxInFlightReads;
+ ui32 MaxInFlightReads = 0;
// vector of freed huge blobs
TDiskPartVec FreedHugeBlobs;
+ TDiskPartVec AllocatedHugeBlobs;
// generated level segments
TVector<TIntrusivePtr<TLevelSegment>> LevelSegments;
@@ -173,14 +195,14 @@ namespace NKikimr {
struct TBatcherPayload {
ui64 Id = 0;
- NMatrix::TVectorType LocalParts; // a bit vector of local parts we are going to read from this disk blob
+ ui8 PartIdx;
TLogoBlobID BlobId;
TDiskPart Location;
TBatcherPayload() = default;
- TBatcherPayload(ui64 id, NMatrix::TVectorType localParts, TLogoBlobID blobId, TDiskPart location)
+ TBatcherPayload(ui64 id, ui8 partIdx, TLogoBlobID blobId, TDiskPart location)
: Id(id)
- , LocalParts(localParts)
+ , PartIdx(partIdx)
, BlobId(blobId)
, Location(location)
{}
@@ -193,11 +215,8 @@ namespace NKikimr {
TDeferredItemQueue DeferredItems;
ui64 NextDeferredItemId = 1;
- // previous key (in case of logoblobs)
- TKey PreviousKey;
-
- // is this the first key?
- bool IsFirstKey = true;
+ TKey Key; // current key
+ std::optional<TKey> PreviousKey; // previous key (nullopt for the first iteration)
public:
struct TStatistics {
@@ -260,13 +279,18 @@ namespace NKikimr {
TStatistics Statistics;
TDuration RestoreDeadline;
+
// Partition key is used for splitting resulting SSTs by the PartitionKey if present. Partition key
// is used for compaction policy implementation to limit number of intermediate chunks durint compaction.
std::optional<TKey> PartitionKey;
+ std::deque<std::unique_ptr<NPDisk::TEvChunkWrite>> PendingWrites;
+
public:
THullCompactionWorker(THullCtxPtr hullCtx,
TPDiskCtxPtr pdiskCtx,
+ THugeBlobCtxPtr hugeBlobCtx,
+ ui32 minREALHugeBlobInBytes,
TIntrusivePtr<TLevelIndex> levelIndex,
const TIterator& it,
bool isFresh,
@@ -276,6 +300,8 @@ namespace NKikimr {
std::optional<TKey> partitionKey)
: HullCtx(std::move(hullCtx))
, PDiskCtx(std::move(pdiskCtx))
+ , HugeBlobCtx(std::move(hugeBlobCtx))
+ , MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
, GType(HullCtx->VCtx->Top->GType)
, LevelIndex(std::move(levelIndex))
, FirstLsn(firstLsn)
@@ -295,8 +321,8 @@ namespace NKikimr {
if (IsFresh) {
ChunksToUse = HullCtx->HullSstSizeInChunksFresh;
MaxInFlightWrites = HullCtx->FreshCompMaxInFlightWrites;
- MaxInFlightReads = 0;
- ReadsInFlight = nullptr;
+ MaxInFlightReads = HullCtx->FreshCompMaxInFlightReads;
+ ReadsInFlight = &LevelIndex->FreshCompReadsInFlight;
WritesInFlight = &LevelIndex->FreshCompWritesInFlight;
} else {
ChunksToUse = HullCtx->HullSstSizeInChunksLevel;
@@ -315,7 +341,7 @@ namespace NKikimr {
// main cycle function; return true if compaction is finished and compaction actor can proceed to index load;
// when there is more work to do, return false; MUST NOT return true unless all pending requests are finished
- bool MainCycle(TVector<std::unique_ptr<IEventBase>>& msgsForYard) {
+ bool MainCycle(TVector<std::unique_ptr<IEventBase>>& msgsForYard, std::vector<ui32> **slotAllocations) {
for (;;) {
switch (State) {
case EState::Invalid:
@@ -323,35 +349,28 @@ namespace NKikimr {
case EState::GetNextItem:
if (It.Valid()) {
- const TKey& key = It.GetCurKey();
- if (IsFirstKey) {
- IsFirstKey = false;
- } else {
- Y_ABORT_UNLESS(!key.IsSameAs(PreviousKey), "duplicate keys: %s -> %s",
- PreviousKey.ToString().data(), key.ToString().data());
- }
- PreviousKey = key;
+ Key = It.GetCurKey();
+ Y_ABORT_UNLESS(!PreviousKey || *PreviousKey < Key, "duplicate keys: %s -> %s",
+ PreviousKey->ToString().data(), Key.ToString().data());
// iterator is valid and we have one more item to process; instruct merger whether we want
// data or not and proceed to TryProcessItem state
Y_ABORT_UNLESS(GcmpIt.Valid());
- IndexMerger.SetLoadDataMode(GcmpIt.KeepData());
It.PutToMerger(&IndexMerger);
const bool haveToProcessItem = PreprocessItem();
- if (haveToProcessItem) {
+ if (!haveToProcessItem) {
+ FinishItem();
+ } else if (TDataMerger& dataMerger = IndexMerger.GetDataMerger(); !LogoBlobs ||
+ dataMerger.GetSlotsToAllocate().empty()) {
State = EState::TryProcessItem;
} else {
- FinishItem();
+ State = EState::WaitForSlotAllocation;
+ *slotAllocations = &dataMerger.GetSlotsToAllocate();
+ return false; // expect allocated slots to continue compacting
}
} else if (WriterPtr) {
- // start processing deferred items
- DeferredItems.Start(WriterPtr.get());
- // start batcher
- ReadBatcher.Start();
- // iterator is not valid and we have writer with items (because we don't create writer
- // unless there are actual items we want to write); proceed to WaitingForDeferredItems state
- State = EState::WaitingForDeferredItems;
+ StartCollectingDeferredItems();
} else {
// iterator is not valid and we have no writer -- so just proceed to WaitForPendingRequests
// state and finish
@@ -359,18 +378,17 @@ namespace NKikimr {
}
break;
+ case EState::WaitForSlotAllocation:
+ return false;
+
case EState::TryProcessItem:
// ensure we have transformed item
- Y_ABORT_UNLESS(TransformedItem);
+ Y_ABORT_UNLESS(MemRec);
// try to process it
- ETryProcessItemStatus status;
- status = TryProcessItem();
- switch (status) {
+ switch (TryProcessItem()) {
case ETryProcessItemStatus::Success:
- // try to send some messages if this is the fresh segment
- if (IsFresh) {
- ProcessPendingMessages(msgsForYard);
- }
+ // try to send some messages if needed
+ ProcessPendingMessages(msgsForYard);
// finalize item
FinishItem();
// continue with next item
@@ -387,18 +405,13 @@ namespace NKikimr {
return false;
case ETryProcessItemStatus::FinishSST:
- // start processing deferred items
- DeferredItems.Start(WriterPtr.get());
- // start batcher
- ReadBatcher.Start();
- // wait
- State = EState::WaitingForDeferredItems;
+ StartCollectingDeferredItems();
break;
}
break;
case EState::WaitingForDeferredItems:
- ProcessPendingMessages(msgsForYard);
+ ProcessPendingMessages(msgsForYard); // issue any messages generated by deferred items queue
if (!DeferredItems.AllProcessed()) {
return false;
}
@@ -408,17 +421,20 @@ namespace NKikimr {
ReadBatcher.Finish();
break;
- case EState::FlushingSST:
- // do not continue processing if there are too many writes in flight
- if (InFlightWrites >= MaxInFlightWrites) {
+ case EState::FlushingSST: {
+ // if MemRec is set, then this state was invoked from the TryProcessItem call
+ const bool finished = FlushSST();
+ State = !finished ? State : MemRec ? EState::TryProcessItem : EState::GetNextItem;
+ ProcessPendingMessages(msgsForYard); // issue any generated messages
+ if (finished) {
+ Y_ABORT_UNLESS(!WriterPtr->GetPendingMessage());
+ WriterPtr.reset();
+ } else {
+ Y_ABORT_UNLESS(InFlightWrites == MaxInFlightWrites);
return false;
}
- // try to flush SST
- if (FlushSST(msgsForYard)) {
- // we return to state from which finalization was invoked
- State = TransformedItem ? EState::TryProcessItem : EState::GetNextItem;
- }
break;
+ }
case EState::WaitForPendingRequests:
// wait until all writes succeed
@@ -440,6 +456,12 @@ namespace NKikimr {
}
}
+ void StartCollectingDeferredItems() {
+ DeferredItems.Start(this);
+ ReadBatcher.Start();
+ State = EState::WaitingForDeferredItems;
+ }
+
TEvRestoreCorruptedBlob *Apply(NPDisk::TEvChunkReadResult *msg, TInstant now) {
AtomicDecrement(*ReadsInFlight);
Y_ABORT_UNLESS(InFlightReads > 0);
@@ -459,13 +481,9 @@ namespace NKikimr {
auto& item = msg->Items.front();
switch (item.Status) {
case NKikimrProto::OK: {
- std::array<TRope, 8> parts;
- ui32 numParts = 0;
- for (ui32 i = item.Needed.FirstPosition(); i != item.Needed.GetSize(); i = item.Needed.NextPosition(i)) {
- parts[numParts++] = std::move(item.Parts[i]);
- }
- DeferredItems.AddReadDiskBlob(item.Cookie, TDiskBlob::CreateFromDistinctParts(&parts[0],
- &parts[numParts], item.Needed, item.BlobId.BlobSize(), Arena, HullCtx->AddHeader), item.Needed);
+ Y_DEBUG_ABORT_UNLESS(item.Needed.CountBits() == 1);
+ const ui8 partIdx = item.Needed.FirstPosition();
+ DeferredItems.AddReadDiskBlob(item.Cookie, std::move(item.Parts[partIdx]), partIdx);
return ProcessReadBatcher(now);
}
@@ -491,11 +509,12 @@ namespace NKikimr {
while (ReadBatcher.GetResultItem(&serial, &payload, &status, &buffer)) {
if (status == NKikimrProto::CORRUPTED) {
ExpectingBlobRestoration = true;
- TEvRestoreCorruptedBlob::TItem item(payload.BlobId, payload.LocalParts, GType, payload.Location, payload.Id);
+ const auto needed = NMatrix::TVectorType::MakeOneHot(payload.PartIdx, GType.TotalPartCount());
+ TEvRestoreCorruptedBlob::TItem item(payload.BlobId, needed, GType, payload.Location, payload.Id);
return new TEvRestoreCorruptedBlob(now + RestoreDeadline, {1u, item}, false, true);
} else {
Y_ABORT_UNLESS(status == NKikimrProto::OK);
- DeferredItems.AddReadDiskBlob(payload.Id, TRope(std::move(buffer)), payload.LocalParts);
+ DeferredItems.AddReadDiskBlob(payload.Id, TRope(std::move(buffer)), payload.PartIdx);
}
}
return nullptr;
@@ -517,14 +536,28 @@ namespace NKikimr {
AllocatedChunks.insert(AllocatedChunks.end(), msg->ChunkIds.begin(), msg->ChunkIds.end());
}
+ void Apply(TEvHugeAllocateSlotsResult *msg) {
+ if constexpr (LogoBlobs) {
+ Y_DEBUG_ABORT_UNLESS(State == EState::WaitForSlotAllocation);
+ State = EState::TryProcessItem;
+ for (const TDiskPart& p : msg->Locations) { // remember newly allocated slots for entrypoint
+ AllocatedHugeBlobs.PushBack(p);
+ }
+ IndexMerger.GetDataMerger().ApplyAllocatedSlots(msg->Locations);
+ } else {
+ Y_ABORT("impossible case");
+ }
+ }
+
const TVector<TIntrusivePtr<TLevelSegment>>& GetLevelSegments() { return LevelSegments; }
const TVector<TChunkIdx>& GetCommitChunks() const { return CommitChunks; }
const TDiskPartVec& GetFreedHugeBlobs() const { return FreedHugeBlobs; }
+ const TDiskPartVec& GetAllocatedHugeBlobs() const { return AllocatedHugeBlobs; }
const TDeque<TChunkIdx>& GetReservedChunks() const { return ReservedChunks; }
const TDeque<TChunkIdx>& GetAllocatedChunks() const { return AllocatedChunks; }
private:
- void CollectRemovedHugeBlobs(const TVector<TDiskPart> &hugeBlobs) {
+ void CollectRemovedHugeBlobs(const std::vector<TDiskPart>& hugeBlobs) {
for (const TDiskPart& p : hugeBlobs) {
if (!p.Empty()) {
FreedHugeBlobs.PushBack(p);
@@ -535,33 +568,43 @@ namespace NKikimr {
// start item processing; this function transforms item using handoff map and adds collected huge blobs, if any
// it returns true if we should keep this item; otherwise it returns false
bool PreprocessItem() {
- const TKey key = It.GetCurKey();
-
// finish merging data for this item
- IndexMerger.Finish();
+ if constexpr (LogoBlobs) {
+ IndexMerger.Finish(HugeBlobCtx->IsHugeBlob(GType, Key.LogoBlobID(), MinREALHugeBlobInBytes));
+ } else {
+ IndexMerger.Finish(false);
+ }
// reset transformed item and try to create new one if we want to keep this item
const bool keepData = GcmpIt.KeepData();
const bool keepItem = GcmpIt.KeepItem();
- TransformedItem = Hmp->Transform(key, &IndexMerger.GetMemRec(), IndexMerger.GetDataMerger(), keepData, keepItem);
if (keepItem) {
++(keepData ? Statistics.KeepItemsWithData : Statistics.KeepItemsWOData);
} else {
++Statistics.DontKeepItems;
}
- // collect huge blobs -- we unconditionally delete DeletedData and save SavedData only in case
- // when this blob is written -- that is, when TransformedItem is set.
- const TDataMerger *dataMerger = TransformedItem ? TransformedItem->DataMerger : IndexMerger.GetDataMerger();
- if (!TransformedItem) {
- CollectRemovedHugeBlobs(dataMerger->GetHugeBlobMerger().SavedData());
+ TDataMerger& dataMerger = IndexMerger.GetDataMerger();
+ if (keepItem) {
+ Hmp->Transform(Key, MemRec.emplace(IndexMerger.GetMemRec()), dataMerger, keepData);
}
- CollectRemovedHugeBlobs(dataMerger->GetHugeBlobMerger().DeletedData());
- return TransformedItem != nullptr;
+ if constexpr (LogoBlobs) {
+ if (!keepItem) { // we are deleting this item too, so we drop saved huge blobs here
+ CollectRemovedHugeBlobs(dataMerger.GetSavedHugeBlobs());
+ }
+ CollectRemovedHugeBlobs(dataMerger.GetDeletedHugeBlobs());
+ }
+
+ return keepItem;
}
ETryProcessItemStatus TryProcessItem() {
+ // if we have PartitionKey, check it is time to split partitions by PartitionKey
+ if (PartitionKey && PreviousKey && *PreviousKey < *PartitionKey && Key <= *PartitionKey && WriterPtr) {
+ return ETryProcessItemStatus::FinishSST;
+ }
+
// if there is no active writer, create one and start writing
if (!WriterPtr) {
// ensure we have enough reserved chunks to do operation; or else request for allocation and wait
@@ -575,106 +618,73 @@ namespace NKikimr {
(ui32)PDiskCtx->Dsk->ChunkSize, PDiskCtx->Dsk->AppendBlockSize,
(ui32)PDiskCtx->Dsk->BulkWriteBlockSize, LevelIndex->AllocSstId(), false, ReservedChunks, Arena,
HullCtx->AddHeader);
+
+ WriterHasPendingOperations = false;
}
- // if we have PartitionKey, check it is time to split partitions by PartitionKey
- if (PartitionKey && !IsFirstKey && PreviousKey < *PartitionKey && TransformedItem->Key >= *PartitionKey &&
- !WriterPtr->Empty()) {
+ // try to push blob to the index
+ TDataMerger& dataMerger = IndexMerger.GetDataMerger();
+ TDiskPart preallocatedLocation;
+ if (!WriterPtr->PushIndexOnly(Key, *MemRec, LogoBlobs ? &dataMerger : nullptr, &preallocatedLocation)) {
return ETryProcessItemStatus::FinishSST;
}
- // special logic for fresh: we just put this item into data segment as usual, do not preallocate and then
- // write data
- if (IsFresh) {
- const bool itemWritten = WriterPtr->Push(TransformedItem->Key, *TransformedItem->MemRec,
- TransformedItem->DataMerger);
- if (itemWritten) {
- Statistics.ItemAdded();
- return ETryProcessItemStatus::Success;
+ // count added item
+ Statistics.ItemAdded();
+
+ if constexpr (LogoBlobs) {
+ const TLogoBlobID& blobId = Key.LogoBlobID();
+
+ auto& collectTask = dataMerger.GetCollectTask();
+ if (MemRec->GetType() == TBlobType::DiskBlob && MemRec->DataSize()) {
+ // ensure preallocated location has correct size
+ Y_DEBUG_ABORT_UNLESS(preallocatedLocation.ChunkIdx && preallocatedLocation.Size == MemRec->DataSize());
+ // producing inline blob with data here
+ for (const auto& [location, partIdx] : collectTask.Reads) {
+ ReadBatcher.AddReadItem(location, {NextDeferredItemId, partIdx, blobId, location});
+ }
+ if (!collectTask.Reads.empty() || WriterHasPendingOperations) { // defer this blob
+ DeferredItems.Put(NextDeferredItemId++, collectTask.Reads.size(), preallocatedLocation,
+ collectTask.BlobMerger, blobId, true);
+ WriterHasPendingOperations = true;
+ } else { // we can and will produce this inline blob now
+ const TDiskPart writtenLocation = WriterPtr->PushDataOnly(dataMerger.CreateDiskBlob(Arena));
+ Y_ABORT_UNLESS(writtenLocation == preallocatedLocation);
+ }
} else {
- return ETryProcessItemStatus::FinishSST;
+ Y_ABORT_UNLESS(collectTask.BlobMerger.Empty());
+ Y_ABORT_UNLESS(collectTask.Reads.empty());
}
- }
-
- // calculate inplaced data size: extract TLogoBlobID from the key, calculate total blob size, then reduce
- // it to part size using layout information and finally calculate serialized blob size using number of local
- // parts stored in this record; if inplacedDataSize is zero, then we do not store any data inside SSTable,
- // otherwise we store DiskBlob and have to assemble it at data pass
- ui32 inplacedDataSize = 0;
- const NMatrix::TVectorType partsToStore = TransformedItem->MemRec->GetLocalParts(GType);
- if (TransformedItem->DataMerger->GetType() == TBlobType::DiskBlob && !partsToStore.Empty()) {
- inplacedDataSize = TDiskBlob::CalculateBlobSize(GType, TransformedItem->Key.LogoBlobID(), partsToStore,
- HullCtx->AddHeader);
- }
- // try to push item into SST; in case of failure there is not enough space to fit this item
- TDiskPart preallocatedLocation;
- if (WriterPtr->PushIndexOnly(TransformedItem->Key, *TransformedItem->MemRec, TransformedItem->DataMerger,
- inplacedDataSize, &preallocatedLocation)) {
- // count added item
- Statistics.ItemAdded();
-
- // if we do generate some small blob, we have to enqueue it in Deferred Items queue and then possibly
- // issue some reads
- if (inplacedDataSize != 0) {
- ui32 numReads = 0;
- auto lambda = [&](const TMemRec& memRec) {
- Y_ABORT_UNLESS(memRec.GetType() == TBlobType::DiskBlob && memRec.HasData());
-
- // find out where our disk blob resides
- TDiskDataExtractor extr;
- memRec.GetDiskData(&extr, nullptr);
- TDiskPart location = extr.SwearOne();
-
- // get its vector of local parts stored in that location
- NMatrix::TVectorType parts = memRec.GetLocalParts(GType);
-
- // enqueue read
- ReadBatcher.AddReadItem(location.ChunkIdx, location.Offset, location.Size,
- TBatcherPayload(NextDeferredItemId, parts, It.GetCurKey().LogoBlobID(), location));
-
- ++numReads;
- };
- IndexMerger.ForEachSmallDiskBlob(std::move(lambda));
-
- // either we read something or it is already in memory
- const TDiskBlobMerger& diskBlobMerger = TransformedItem->DataMerger->GetDiskBlobMerger();
- Y_ABORT_UNLESS(!diskBlobMerger.Empty() || numReads > 0, "Key# %s MemRec# %s LocalParts# %s KeepData# %s",
- It.GetCurKey().ToString().data(),
- TransformedItem->MemRec->ToString(HullCtx->IngressCache.Get(), nullptr).data(),
- TransformedItem->MemRec->GetLocalParts(GType).ToString().data(),
- GcmpIt.KeepData() ? "true" : "false");
-
- // TODO(alexvru): maybe we should get rid of copying DiskBlobMerger?
- DeferredItems.Put(NextDeferredItemId, numReads, preallocatedLocation, diskBlobMerger, partsToStore,
- It.GetCurKey().LogoBlobID());
-
- // advance deferred item identifier
- ++NextDeferredItemId;
+ for (const auto& [partIdx, from, to] : dataMerger.GetHugeBlobWrites()) {
+ const auto parts = NMatrix::TVectorType::MakeOneHot(partIdx, GType.TotalPartCount());
+ DeferredItems.Put(NextDeferredItemId++, 0, to, TDiskBlob(from, parts, GType, blobId), blobId, false);
}
- // return success indicating that this item required no further processing
- return ETryProcessItemStatus::Success;
- } else {
- // return failure meaning this item should be restarted after flushing current SST
- return ETryProcessItemStatus::FinishSST;
+ for (const auto& [partIdx, from, to] : dataMerger.GetHugeBlobMoves()) {
+ ReadBatcher.AddReadItem(from, {NextDeferredItemId, partIdx, blobId, from});
+ DeferredItems.Put(NextDeferredItemId++, 1, to, TDiskBlobMerger(), blobId, false);
+ }
}
+
+ // return success indicating that this item required no further processing
+ return ETryProcessItemStatus::Success;
}
void FinishItem() {
+ // adjust previous key
+ PreviousKey.emplace(Key);
// clear merger and on-disk record list and advance both iterators synchronously
IndexMerger.Clear();
It.Next();
GcmpIt.Next();
- TransformedItem = nullptr;
+ MemRec.reset();
}
- bool FlushSST(TVector<std::unique_ptr<IEventBase>>& msgsForYard) {
+ bool FlushSST() {
// try to flush some more data; if the flush fails, it means that we have reached in flight write limit and
// there is nothing to do here now, so we return
- const bool flushDone = WriterPtr->FlushNext(FirstLsn, LastLsn, MaxInFlightWrites - InFlightWrites);
- ProcessPendingMessages(msgsForYard);
- if (!flushDone) {
+ if (!WriterPtr->FlushNext(FirstLsn, LastLsn, MaxInFlightWrites - InFlightWrites)) {
return false;
}
@@ -683,20 +693,18 @@ namespace NKikimr {
LevelSegments.push_back(conclusion.LevelSegment);
CommitChunks.insert(CommitChunks.end(), conclusion.UsedChunks.begin(), conclusion.UsedChunks.end());
- // ensure that all writes were executed and drop writer
- Y_ABORT_UNLESS(!WriterPtr->GetPendingMessage());
- WriterPtr.reset();
-
return true;
}
void ProcessPendingMessages(TVector<std::unique_ptr<IEventBase>>& msgsForYard) {
// ensure that we have writer
Y_ABORT_UNLESS(WriterPtr);
+ Y_ABORT_UNLESS(MaxInFlightWrites);
+ Y_ABORT_UNLESS(MaxInFlightReads);
// send new messages until we reach in flight limit
std::unique_ptr<NPDisk::TEvChunkWrite> msg;
- while (InFlightWrites < MaxInFlightWrites && (msg = WriterPtr->GetPendingMessage())) {
+ while (InFlightWrites < MaxInFlightWrites && (msg = GetPendingWriteMessage())) {
HullCtx->VCtx->CountCompactionCost(*msg);
Statistics.Update(msg.get());
msgsForYard.push_back(std::move(msg));
@@ -715,6 +723,15 @@ namespace NKikimr {
}
}
+ std::unique_ptr<NPDisk::TEvChunkWrite> GetPendingWriteMessage() {
+ std::unique_ptr<NPDisk::TEvChunkWrite> res = WriterPtr->GetPendingMessage();
+ if (!res && !PendingWrites.empty()) {
+ res = std::move(PendingWrites.front());
+ PendingWrites.pop_front();
+ }
+ return res;
+ }
+
std::unique_ptr<NPDisk::TEvChunkReserve> CheckForReservation() {
if (ReservedChunks.size() + ChunkReservePending >= ChunksToUse) {
return nullptr;
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h
index 879f30e984c..071117234df 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h>
+#include <ydb/core/blobstorage/vdisk/common/disk_part.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk.h>
#include <util/generic/queue.h>
@@ -115,9 +116,10 @@ namespace NKikimr {
// enqueue read item -- a read from specific chunk at desired position and length; function returns serial
// number of this request; all results are then reported sequently in ascending order of returned serial
- ui64 AddReadItem(TChunkIdx chunkIdx, ui32 offset, ui32 size, TPayload&& payload) {
+ ui64 AddReadItem(TDiskPart location, TPayload&& payload) {
const ui64 serial = NextSerial++;
- ReadQueue.push_back(TReadItem{chunkIdx, offset, size, std::move(payload), serial, NKikimrProto::UNKNOWN, {}});
+ ReadQueue.push_back(TReadItem{location.ChunkIdx, location.Offset, location.Size, std::move(payload), serial,
+ NKikimrProto::UNKNOWN, {}});
return serial;
}
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp
index f23ad8640c4..535d4e6b69e 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_readbatch_ut.cpp
@@ -66,7 +66,7 @@ Y_UNIT_TEST_SUITE(ReadBatcher) {
const ui32 offset = rng() % chunkSize;
const ui32 len = 1 + rng() % Min<ui32>(100000, chunkSize + 1 - offset);
TPayload payload{it->first, offset, len};
- batcher.AddReadItem(it->first, offset, len, std::move(payload));
+ batcher.AddReadItem({it->first, offset, len}, std::move(payload));
}
batcher.Start();
diff --git a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp
index 4b64cb675a4..fc7d8b46790 100644
--- a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.cpp
@@ -12,11 +12,9 @@ namespace NKikimr {
TReleaseQueueItem& item = ReleaseQueue.front();
if (CurrentSnapshots.empty() || (item.RecordLsn <= CurrentSnapshots.begin()->first)) {
// matching record -- commit it to huge hull keeper and throw out of the queue
- if (item.WId) {
+ if (!item.RemovedHugeBlobs.Empty() || !item.AllocatedHugeBlobs.Empty()) {
ctx.Send(hugeKeeperId, new TEvHullFreeHugeSlots(std::move(item.RemovedHugeBlobs),
- item.RecordLsn, item.Signature, item.WId));
- } else {
- Y_ABORT_UNLESS(item.RemovedHugeBlobs.Empty());
+ std::move(item.AllocatedHugeBlobs), item.RecordLsn, item.Signature, item.WId));
}
if (item.ChunksToForget) {
LOG_DEBUG(ctx, NKikimrServices::BS_VDISK_CHUNKS, VDISKP(vctx->VDiskLogPrefix,
diff --git a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h
index 34e6a6c3447..92bf649012c 100644
--- a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h
+++ b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h
@@ -55,14 +55,16 @@ namespace NKikimr {
struct TReleaseQueueItem {
ui64 RecordLsn;
TDiskPartVec RemovedHugeBlobs;
+ TDiskPartVec AllocatedHugeBlobs;
TVector<TChunkIdx> ChunksToForget;
TLogSignature Signature;
ui64 WId;
- TReleaseQueueItem(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TVector<TChunkIdx> chunksToForget,
- TLogSignature signature, ui64 wId)
+ TReleaseQueueItem(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TDiskPartVec&& allocatedHugeBlobs,
+ TVector<TChunkIdx> chunksToForget, TLogSignature signature, ui64 wId)
: RecordLsn(recordLsn)
, RemovedHugeBlobs(std::move(removedHugeBlobs))
+ , AllocatedHugeBlobs(std::move(allocatedHugeBlobs))
, ChunksToForget(std::move(chunksToForget))
, Signature(signature)
, WId(wId)
@@ -89,17 +91,21 @@ namespace NKikimr {
// this function is called every time when compaction is about to commit new entrypoint containing at least
// one removed huge blob; recordLsn is allocated LSN of this entrypoint
- void Update(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TVector<TChunkIdx> chunksToForget, TLogSignature signature,
- ui64 wId, const TActorContext& ctx, const TActorId& hugeKeeperId, const TActorId& skeletonId,
- const TPDiskCtxPtr& pdiskCtx, const TVDiskContextPtr& vctx) {
+ void Update(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TDiskPartVec&& allocatedHugeBlobs,
+ TVector<TChunkIdx> chunksToForget, TLogSignature signature, ui64 wId, const TActorContext& ctx,
+ const TActorId& hugeKeeperId, const TActorId& skeletonId, const TPDiskCtxPtr& pdiskCtx,
+ const TVDiskContextPtr& vctx) {
Y_ABORT_UNLESS(recordLsn > LastDeletionLsn);
LastDeletionLsn = recordLsn;
LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLCOMP, vctx->VDiskLogPrefix
- << "TDelayedCompactionDeleter: Update recordLsn# " << recordLsn << " removedHugeBlobs.size# "
- << removedHugeBlobs.Size() << " CurrentSnapshots.size# " << CurrentSnapshots.size()
+ << "TDelayedCompactionDeleter: Update recordLsn# " << recordLsn
+ << " removedHugeBlobs.size# " << removedHugeBlobs.Size()
+ << " allocatedHugeBlobs.size# " << allocatedHugeBlobs.Size()
+ << " CurrentSnapshots.size# " << CurrentSnapshots.size()
<< " CurrentSnapshots.front# " << (CurrentSnapshots.empty() ? 0 : CurrentSnapshots.begin()->first)
<< " CurrentSnapshots.back# " << (CurrentSnapshots.empty() ? 0 : (--CurrentSnapshots.end())->first));
- ReleaseQueue.emplace_back(recordLsn, std::move(removedHugeBlobs), std::move(chunksToForget), signature, wId);
+ ReleaseQueue.emplace_back(recordLsn, std::move(removedHugeBlobs), std::move(allocatedHugeBlobs),
+ std::move(chunksToForget), signature, wId);
ProcessReleaseQueue(ctx, hugeKeeperId, skeletonId, pdiskCtx, vctx);
}
diff --git a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h
index 7f5010430b1..ecfe2175ef6 100644
--- a/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h
+++ b/ydb/core/blobstorage/vdisk/ingress/blobstorage_ingress_matrix.h
@@ -97,8 +97,7 @@ namespace NKikimr {
}
ui8 CountBits() const {
- unsigned v = Vec;
- return ::PopCount(v);
+ return ::PopCount(Vec);
}
ui8 Raw() const {
@@ -134,8 +133,7 @@ namespace NKikimr {
}
TVectorType operator ~() const {
- ui8 v = ~Vec;
- return TVectorType(v, Size);
+ return TVectorType(~Vec, Size);
}
bool operator ==(const TVectorType &v) const {
@@ -174,6 +172,34 @@ namespace NKikimr {
return res;
}
+ class TIterator {
+ const TVectorType& Parts;
+ ui8 Index;
+
+ public:
+ TIterator(const TVectorType& parts, bool end)
+ : Parts(parts)
+ , Index(end ? Parts.GetSize() : Parts.FirstPosition())
+ {}
+
+ friend bool operator ==(const TIterator& x, const TIterator& y) {
+ Y_DEBUG_ABORT_UNLESS(&x.Parts == &y.Parts);
+ return x.Index == y.Index;
+ }
+
+ ui8 operator *() const {
+ return Index;
+ }
+
+ TIterator& operator ++() {
+ Index = Parts.NextPosition(Index);
+ return *this;
+ }
+ };
+
+ TIterator begin() const { return {*this, false}; }
+ TIterator end() const { return {*this, true}; }
+
private:
ui8 Vec;
ui8 Size;
diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp
index 0ff8c5bc8fc..1ffd89f504b 100644
--- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp
+++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp
@@ -75,7 +75,6 @@ namespace NKikimr {
NHuge::TAllocChunkRecoveryLogRec HugeBlobAllocChunkRecoveryLogRec;
NHuge::TFreeChunkRecoveryLogRec HugeBlobFreeChunkRecoveryLogRec;
NHuge::TPutRecoveryLogRec HugeBlobPutRecoveryLogRec;
- TDiskPartVec HugeBlobs;
NKikimrVDiskData::TPhantomLogoBlobs PhantomLogoBlobs;
void Bootstrap(const TActorContext &ctx) {
@@ -700,9 +699,9 @@ namespace NKikimr {
if (!good)
return EDispatchStatus::Error;
- HugeBlobs = TDiskPartVec(pb.GetRemovedHugeBlobs());
ui64 lsn = record.Lsn;
- TRlas res = LocRecCtx->RepairedHuge->ApplySlotsDeletion(ctx, lsn, HugeBlobs, dbType);
+ TRlas res = LocRecCtx->RepairedHuge->ApplySlotsDeletion(ctx, lsn, TDiskPartVec(pb.GetRemovedHugeBlobs()),
+ TDiskPartVec(pb.GetAllocatedHugeBlobs()), dbType);
if (!res.Ok)
return EDispatchStatus::Error;
diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp
index 107936cde0b..cda349b5219 100644
--- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp
+++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp
@@ -521,6 +521,7 @@ namespace NKikimr {
Config->HullSstSizeInChunksLevel,
Config->HullCompFreeSpaceThreshold,
Config->FreshCompMaxInFlightWrites,
+ Config->FreshCompMaxInFlightReads,
Config->HullCompMaxInFlightWrites,
Config->HullCompMaxInFlightReads,
Config->HullCompReadBatchEfficiencyThreshold,
diff --git a/ydb/core/blobstorage/vdisk/protos/events.proto b/ydb/core/blobstorage/vdisk/protos/events.proto
index 78a70268bcd..2e98796ac20 100644
--- a/ydb/core/blobstorage/vdisk/protos/events.proto
+++ b/ydb/core/blobstorage/vdisk/protos/events.proto
@@ -60,6 +60,7 @@ message FreshSegmentStat {
message FreshStat {
uint32 compaction_writes_in_flight = 1;
+ uint32 compaction_reads_in_flight = 5;
FreshSegmentStat current = 2;
FreshSegmentStat dreg = 3;
FreshSegmentStat old = 4;
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
index 7aff723af2c..b53078dc238 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
@@ -13,12 +13,7 @@ namespace NKikimr {
// TReadBatcher implementation
////////////////////////////////////////////////////////////////////////////
// Traverse data parts for a single key
- void TReadBatcher::StartTraverse(
- const TLogoBlobID& id,
- void *cookie,
- ui8 queryPartId,
- ui32 queryShift,
- ui32 querySize) {
+ void TReadBatcher::StartTraverse(const TLogoBlobID& id, void *cookie, ui8 queryPartId, ui32 queryShift, ui32 querySize) {
Y_DEBUG_ABORT_UNLESS(id.PartId() == 0);
Y_DEBUG_ABORT_UNLESS(!Traversing);
ClearTmpItems();
@@ -30,24 +25,18 @@ namespace NKikimr {
QueryPartId = queryPartId;
QueryShift = queryShift;
QuerySize = querySize;
- FoundDiskItems.clear();
- FoundInMemItems.clear();
}
// We have data on disk
void TReadBatcher::operator () (const TDiskPart &data, NMatrix::TVectorType parts) {
Y_DEBUG_ABORT_UNLESS(Traversing);
- FoundDiskItems.emplace_back(data, parts);
- }
-
- void TReadBatcher::ProcessFoundDiskItem(const TDiskPart &data, NMatrix::TVectorType parts) {
if (QueryPartId && !parts.Get(QueryPartId - 1)) {
return; // we have no requested part here
}
const auto& gtype = Ctx->VCtx->Top->GType;
ui32 blobSize = 0;
- for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
+ for (ui8 i : parts) {
blobSize += gtype.PartSize(TLogoBlobID(CurID, i + 1));
}
@@ -58,7 +47,7 @@ namespace NKikimr {
Y_ABORT_UNLESS(blobSize == data.Size);
}
- for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
+ for (ui8 i : parts) {
const TLogoBlobID partId(CurID, i + 1);
const ui32 partSize = gtype.PartSize(partId);
if (QueryPartId == 0 || QueryPartId == i + 1) {
@@ -84,10 +73,6 @@ namespace NKikimr {
// We have diskBlob in memory
void TReadBatcher::operator () (const TDiskBlob &diskBlob) {
Y_DEBUG_ABORT_UNLESS(Traversing);
- FoundInMemItems.push_back(diskBlob);
- }
-
- void TReadBatcher::ProcessFoundInMemItem(const TDiskBlob &diskBlob) {
if (QueryPartId == 0 || diskBlob.GetParts().Get(QueryPartId - 1)) {
// put data item iff we gather all parts OR we need a concrete part and parts contain it
for (TDiskBlob::TPartIterator it = diskBlob.begin(), e = diskBlob.end(); it != e; ++it) {
@@ -116,22 +101,12 @@ namespace NKikimr {
Y_DEBUG_ABORT_UNLESS(Traversing);
Traversing = false;
- // process found items; first, we process disk items; then, we process in-mem items that may possibly
- // overwrite disk ones to prevent read IOPS
- for (const std::tuple<TDiskPart, NMatrix::TVectorType> &diskItem : FoundDiskItems) {
- ProcessFoundDiskItem(std::get<0>(diskItem), std::get<1>(diskItem));
- }
- for (const TDiskBlob &diskBlob : FoundInMemItems) {
- ProcessFoundInMemItem(diskBlob);
- }
-
// NOTE: we may have parts that are not replicated yet;
// we MUST NOT return NO_DATA for them; but when parts are missing due to finished GC, we report NODATA
- NMatrix::TVectorType mustHave = ingress.PartsWeMustHaveLocally(Ctx->VCtx->Top.get(),
- Ctx->VCtx->ShortSelfVDisk, CurID);
- NMatrix::TVectorType actuallyHave = ingress.LocalParts(Ctx->VCtx->Top->GType);
- NMatrix::TVectorType missingParts = mustHave - actuallyHave;
- for (ui8 i = missingParts.FirstPosition(); i != missingParts.GetSize(); i = missingParts.NextPosition(i)) {
+ const auto mustHave = ingress.PartsWeMustHaveLocally(Ctx->VCtx->Top.get(), Ctx->VCtx->ShortSelfVDisk, CurID);
+ const auto actuallyHave = ingress.LocalParts(Ctx->VCtx->Top->GType);
+ const auto missingParts = mustHave - actuallyHave;
+ for (ui8 i : missingParts) {
// NOT_YET
if (QueryPartId == 0 || i + 1 == QueryPartId) {
Y_ABORT_UNLESS(TmpItems[i].Empty());
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.h b/ydb/core/blobstorage/vdisk/query/query_readbatch.h
index 90cdb84c8aa..280a1fea3a4 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.h
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.h
@@ -307,10 +307,8 @@ namespace NKikimr {
// We have data on disk
void operator () (const TDiskPart &data, NMatrix::TVectorType parts);
- void ProcessFoundDiskItem(const TDiskPart &data, NMatrix::TVectorType parts);
// We have diskBlob in memory
void operator () (const TDiskBlob &diskBlob);
- void ProcessFoundInMemItem(const TDiskBlob &diskBlob);
// Finish data traverse for a single key
void FinishTraverse(const TIngress &ingress);
// Abort traverse without giving out results
@@ -349,9 +347,6 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> Result;
ui64 PDiskReadBytes = 0;
- TStackVec<std::tuple<TDiskPart, NMatrix::TVectorType>, MaxTotalPartCount> FoundDiskItems;
- TStackVec<TDiskBlob, MaxTotalPartCount> FoundInMemItems;
-
NReadBatcher::TGlueRead *AddGlueRead(NReadBatcher::TDataItem *item);
void PrepareReadPlan();
TString DiskDataItemsToString() const;
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h
index e395596774f..0e1d195761a 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst.h
@@ -53,6 +53,7 @@ namespace NKikimr {
, HullDs(std::move(hullDs))
, State(EState::STOPPED)
, MinReservedChunksCount(ReplCtx->VDiskCfg->HullSstSizeInChunksFresh)
+ , Merger(ReplCtx->VCtx->Top->GType, ReplCtx->GetAddHeader())
, Arena(&TRopeArenaBackend::Allocate)
{}
@@ -148,19 +149,21 @@ namespace NKikimr {
Y_ABORT_UNLESS(record.Id > PrevID);
Y_ABORT_UNLESS(!record.Id.PartId());
- // generate merged ingress for all locally recovered parts
- TIngress ingress = TIngress::CreateFromRepl(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk,
- record.Id, record.LocalParts);
+ TMemRecLogoBlob memRec(TIngress::CreateFromRepl(ReplCtx->VCtx->Top.get(), ReplCtx->VCtx->ShortSelfVDisk,
+ record.Id, record.LocalParts));
- // add disk blob to merger (in order to write it to SSTable)
- Merger.AddBlob(TDiskBlob(&record.Data, record.LocalParts, ReplCtx->VCtx->Top->GType, record.Id));
+ // set merger state to match generated blob
+ memRec.SetType(TBlobType::DiskBlob);
+ memRec.SetDiskBlob(TDiskPart(0, 0, record.Data.GetSize()));
+ Merger.FinishFromBlob();
- // create memory record for disk blob with correct length
- TMemRecLogoBlob memRec(ingress);
- memRec.SetDiskBlob(TDiskPart(0, 0, Merger.GetDiskBlobRawSize(ReplCtx->GetAddHeader())));
-
- const bool success = Writer->Push(record.Id, memRec, &Merger);
+ TDiskPart preallocatedLocation;
+ const bool success = Writer->PushIndexOnly(record.Id, memRec, &Merger, &preallocatedLocation);
if (success) {
+ Y_DEBUG_ABORT_UNLESS(Merger.GetCollectTask().Reads.empty());
+ const TDiskPart writtenLocation = Writer->PushDataOnly(std::move(record.Data));
+ Y_ABORT_UNLESS(writtenLocation == preallocatedLocation);
+
if (auto msg = Writer->GetPendingMessage()) {
IssueWriteCmd(std::move(msg), EOutputState::INTERMEDIATE_CHUNK);
}
diff --git a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp
index e4c184ed855..e07f2752f87 100644
--- a/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/repl/blobstorage_hullreplwritesst_ut.cpp
@@ -39,7 +39,7 @@ TVDiskContextPtr CreateVDiskContext(const TBlobStorageGroupInfo& info) {
TIntrusivePtr<THullCtx> CreateHullCtx(const TBlobStorageGroupInfo& info, ui32 chunkSize, ui32 compWorthReadSize) {
return MakeIntrusive<THullCtx>(CreateVDiskContext(info), chunkSize, compWorthReadSize, true, true, true, true, 1u,
- 1u, 2.0, 10u, 10u, 20u, 0.5, TDuration::Minutes(5), TDuration::Seconds(1), true);
+ 1u, 2.0, 10u, 10u, 10u, 20u, 0.5, TDuration::Minutes(5), TDuration::Seconds(1), true);
}
TIntrusivePtr<THullDs> CreateHullDs(const TBlobStorageGroupInfo& info) {
diff --git a/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp
index 077a8eaea53..ea9f9639cac 100644
--- a/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp
@@ -146,7 +146,7 @@ namespace NKikimr {
// filter out the read queue -- remove items that are already available or not needed
auto remove = [](const TReadCmd& item) {
const NMatrix::TVectorType missing = item.Item->Needed - item.Item->GetAvailableParts();
- return (missing & item.Parts).Empty() || item.Location == item.Item->CorruptedPart;
+ return (missing & item.Parts).Empty() || item.Location.Includes(item.Item->CorruptedPart);
};
ReadQ.erase(std::remove_if(ReadQ.begin(), ReadQ.end(), remove), ReadQ.end());
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 54e62b22f40..0832fbc6234 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -206,6 +206,9 @@ namespace NKikimr {
if (Config->RunRepl) {
ctx.Send(Db->ReplID, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes));
}
+ if (Hull) {
+ Hull->ApplyHugeBlobSize(MinREALHugeBlobInBytes, ctx);
+ }
ctx.Send(*SkeletonFrontIDPtr, new TEvMinHugeBlobSizeUpdate(MinREALHugeBlobInBytes));
}
}
@@ -1861,6 +1864,7 @@ namespace NKikimr {
if (ev->Get()->Status == NKikimrProto::OK) {
ApplyHugeBlobSize(Config->MinHugeBlobInBytes);
Y_ABORT_UNLESS(MinREALHugeBlobInBytes);
+
// handle special case when donor disk starts and finds out that it has been wiped out
if (ev->Get()->LsnMngr->GetOriginallyRecoveredLsn() == 0 && Config->BaseInfo.DonorMode) {
// send drop donor cmd to NodeWarden
@@ -1939,11 +1943,11 @@ namespace NKikimr {
Db->HugeKeeperID);
// create Hull
- Hull = std::make_shared<THull>(Db->LsnMngr, PDiskCtx, Db->SkeletonID,
- Config->BalancingEnableDelete, std::move(*ev->Get()->Uncond),
- ctx.ExecutorThread.ActorSystem, Config->BarrierValidation);
+ Hull = std::make_shared<THull>(Db->LsnMngr, PDiskCtx, HugeBlobCtx, MinREALHugeBlobInBytes,
+ Db->SkeletonID, Config->BalancingEnableDelete, std::move(*ev->Get()->Uncond),
+ ctx.ExecutorThread.ActorSystem, Config->BarrierValidation, Db->HugeKeeperID);
ActiveActors.Insert(Hull->RunHullServices(Config, HullLogCtx, Db->SyncLogFirstLsnToKeep,
- Db->LoggerID, Db->LogCutterID, ctx), ctx, NKikimrServices::BLOBSTORAGE);
+ Db->LoggerID, Db->LogCutterID, ctx), ctx, NKikimrServices::BLOBSTORAGE);
// create VDiskCompactionState
VDiskCompactionState = std::make_unique<TVDiskCompactionState>(Hull->GetHullDs()->LogoBlobs->LIActor,
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h
index 09cb0d95ac3..70a5528eeb4 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_capturevdisklayout.h
@@ -34,6 +34,7 @@ namespace NKikimr {
// then, scan all the blobs
struct TMerger {
TRes& Res;
+ ui64 SstId;
ui32 Level;
static constexpr bool HaveToMergeData() { return true; }
@@ -43,7 +44,7 @@ namespace NKikimr {
void Clear() {}
void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, const TKeyLogoBlob& key,
- ui64 circaLsn) {
+ ui64 /*circaLsn*/) {
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, outbound);
const TRes::ERecordType recordType = extr.BlobType == TBlobType::DiskBlob
@@ -52,7 +53,7 @@ namespace NKikimr {
for (const TDiskPart *location = extr.Begin; location != extr.End; ++location) {
if (location->ChunkIdx && location->Size) {
Res.Layout.push_back({*location, TRes::EDatabase::LogoBlobs, recordType, key.LogoBlobID(),
- circaLsn, Level});
+ SstId, Level});
}
}
}
@@ -66,7 +67,7 @@ namespace NKikimr {
};
auto logoBlobsCallback = [&](const auto& sst, ui32 level) {
- TMerger merger{*res, level};
+ TMerger merger{*res, sst->AssignedSstId, level};
TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob>::TMemIterator iter(sst.Get());
for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
iter.PutToMerger(&merger);
@@ -79,7 +80,7 @@ namespace NKikimr {
traverse(Snap.BlocksSnap.SliceSnap, ignoreCallback, TRes::EDatabase::Blocks);
traverse(Snap.BarriersSnap.SliceSnap, ignoreCallback, TRes::EDatabase::Barriers);
- TMerger merger{*res, Max<ui32>()};
+ TMerger merger{*res, 0, Max<ui32>()};
TFreshDataSnapshot<TKeyLogoBlob, TMemRecLogoBlob>::TForwardIterator iter(Snap.HullCtx, &Snap.LogoBlobsSnap.FreshSnap);
THeapIterator<TKeyLogoBlob, TMemRecLogoBlob, true> heapIt(&iter);
heapIt.Walk(TKeyLogoBlob::First(), &merger, [] (TKeyLogoBlob /*key*/, auto* /*merger*/) { return true; });
diff --git a/ydb/core/protos/blobstorage_vdisk_internal.proto b/ydb/core/protos/blobstorage_vdisk_internal.proto
index 69f3b49235c..9fa2877716e 100644
--- a/ydb/core/protos/blobstorage_vdisk_internal.proto
+++ b/ydb/core/protos/blobstorage_vdisk_internal.proto
@@ -72,6 +72,7 @@ message THullDbEntryPoint {
optional TLevelIndex LevelIndex = 1;
optional TDiskPartVec RemovedHugeBlobs = 2;
+ optional TDiskPartVec AllocatedHugeBlobs = 4;
// obsolete field
repeated TUncommittedRemovedHugeBlob UncommittedRemovedHugeBlobs = 3;