aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Belyakov <serg-belyakov@ydb.tech>2024-05-22 15:21:38 +0300
committerGitHub <noreply@github.com>2024-05-22 15:21:38 +0300
commita26dfc6e3aa6cb6720ecb4ddb419709cac8bf082 (patch)
tree4fc921f964bf97fe7e39a5449ce6c59fc6dc572f
parent61d0cf5c36f7c4358c190ad1bac9b0c56df8b4db (diff)
downloadydb-a26dfc6e3aa6cb6720ecb4ddb419709cac8bf082.tar.gz
Introduce local SyncLog data cutter (#4124)
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.cpp11
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_impl.h11
-rw-r--r--ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp10
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h2
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/sync.cpp101
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/ut_helpers.h5
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_config.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_config.h5
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp13
-rw-r--r--ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp9
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp114
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h6
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp35
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp91
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h99
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp5
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h15
-rw-r--r--ydb/core/protos/config.proto29
-rw-r--r--ydb/core/quoter/ut_helpers.cpp2
-rw-r--r--ydb/core/sys_view/partition_stats/partition_stats_ut.cpp2
-rw-r--r--ydb/core/tablet_flat/test/libs/exec/runner.h2
-rw-r--r--ydb/core/testlib/actors/test_runtime.cpp4
-rw-r--r--ydb/core/testlib/actors/test_runtime.h2
-rw-r--r--ydb/core/testlib/actors/test_runtime_ut.cpp2
-rw-r--r--ydb/core/testlib/basics/appdata.cpp9
-rw-r--r--ydb/core/testlib/basics/appdata.h3
-rw-r--r--ydb/core/testlib/basics/helpers.h2
-rw-r--r--ydb/core/testlib/basics/services.cpp21
-rw-r--r--ydb/core/testlib/tenant_runtime.cpp5
-rw-r--r--ydb/core/testlib/test_client.cpp7
-rw-r--r--ydb/library/ncloud/impl/access_service_ut.cpp2
-rw-r--r--ydb/library/services/services.proto1
32 files changed, 518 insertions, 109 deletions
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
index 682bf9cd94..0008414141 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
@@ -169,8 +169,15 @@ void TNodeWarden::Bootstrap() {
DsProxyPerPoolCounters = new TDsProxyPerPoolCounters(AppData()->Counters);
if (actorSystem && actorSystem->AppData<TAppData>() && actorSystem->AppData<TAppData>()->Icb) {
- actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
- actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
+ const TIntrusivePtr<NKikimr::TControlBoard>& icb = actorSystem->AppData<TAppData>()->Icb;
+
+ icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
+ icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
+ icb->RegisterSharedControl(EnableLocalSyncLogDataCutting, "VDiskControls.EnableLocalSyncLogDataCutting");
+ icb->RegisterSharedControl(EnableSyncLogChunkCompressionHDD, "VDiskControls.EnableSyncLogChunkCompressionHDD");
+ icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD");
+ icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD");
+ icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD");
}
// start replication broker
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
index 985680808f..b0dc88d2fe 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h
+++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h
@@ -133,6 +133,12 @@ namespace NKikimr::NStorage {
TControlWrapper EnablePutBatching;
TControlWrapper EnableVPatch;
+ TControlWrapper EnableLocalSyncLogDataCutting;
+ TControlWrapper EnableSyncLogChunkCompressionHDD;
+ TControlWrapper EnableSyncLogChunkCompressionSSD;
+ TControlWrapper MaxSyncLogChunksInFlightHDD;
+ TControlWrapper MaxSyncLogChunksInFlightSSD;
+
TReplQuoter::TPtr ReplNodeRequestQuoter;
TReplQuoter::TPtr ReplNodeResponseQuoter;
@@ -148,6 +154,11 @@ namespace NKikimr::NStorage {
: Cfg(cfg)
, EnablePutBatching(Cfg->FeatureFlags.GetEnablePutBatchingForBlobStorage(), false, true)
, EnableVPatch(Cfg->FeatureFlags.GetEnableVPatch(), false, true)
+ , EnableLocalSyncLogDataCutting(0, 0, 1)
+ , EnableSyncLogChunkCompressionHDD(1, 0, 1)
+ , EnableSyncLogChunkCompressionSSD(0, 0, 1)
+ , MaxSyncLogChunksInFlightHDD(10, 1, 1024)
+ , MaxSyncLogChunksInFlightSSD(10, 1, 1024)
{
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
AvailDomainId = 1;
diff --git a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
index 7a1fa34dbe..47ebb27134 100644
--- a/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
+++ b/ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
@@ -174,6 +174,16 @@ namespace NKikimr::NStorage {
vdiskConfig->EnableVDiskCooldownTimeout = Cfg->EnableVDiskCooldownTimeout;
vdiskConfig->ReplPausedAtStart = Cfg->VDiskReplPausedAtStart;
vdiskConfig->EnableVPatch = EnableVPatch;
+
+ vdiskConfig->EnableLocalSyncLogDataCutting = EnableLocalSyncLogDataCutting;
+ if (deviceType == NPDisk::EDeviceType::DEVICE_TYPE_ROT) {
+ vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionHDD;
+ vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightHDD;
+ } else {
+ vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionSSD;
+ vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightSSD;
+ }
+
vdiskConfig->FeatureFlags = Cfg->FeatureFlags;
if (Cfg->BlobStorageConfig.HasCostMetricsSettings()) {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
index f1b5d2b0c0..0012985963 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
@@ -72,7 +72,7 @@ public:
appData->IoContextFactory = IoContext.get();
Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
- Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}});
+ Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}});
Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG);
diff --git a/ydb/core/blobstorage/ut_blobstorage/sync.cpp b/ydb/core/blobstorage/ut_blobstorage/sync.cpp
index d092eac9b9..41a32501a7 100644
--- a/ydb/core/blobstorage/ut_blobstorage/sync.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/sync.cpp
@@ -1,7 +1,108 @@
+#include <ydb/core/blobstorage/ut_blobstorage/ut_helpers.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h>
+#include <util/random/random.h>
Y_UNIT_TEST_SUITE(BlobStorageSync) {
+
+ void TestCutting(TBlobStorageGroupType groupType) {
+ const ui32 groupSize = groupType.BlobSubgroupSize();
+
+ // for (ui32 mask = 0; mask < (1 << groupSize); ++mask) { // TIMEOUT
+ {
+ ui32 mask = RandomNumber(1ull << groupSize);
+ for (bool compressChunks : { true, false }) {
+ TEnvironmentSetup env{{
+ .NodeCount = groupSize,
+ .Erasure = groupType,
+ }};
+
+ env.CreateBoxAndPool(1, 1);
+ std::vector<ui32> groups = env.GetGroups();
+ UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
+ ui32 groupId = groups[0];
+
+ const ui64 tabletId = 5000;
+ const ui32 channel = 10;
+ ui32 gen = 1;
+ ui32 step = 1;
+ ui64 cookie = 1;
+
+ ui64 totalSize = 0;
+
+ std::vector<TControlWrapper> cutLocalSyncLogControls;
+ std::vector<TControlWrapper> compressChunksControls;
+ std::vector<TActorId> edges;
+
+ for (ui32 nodeId = 1; nodeId <= groupSize; ++nodeId) {
+ cutLocalSyncLogControls.emplace_back(0, 0, 1);
+ compressChunksControls.emplace_back(1, 0, 1);
+ TAppData* appData = env.Runtime->GetNode(nodeId)->AppData.get();
+ appData->Icb->RegisterSharedControl(cutLocalSyncLogControls.back(), "VDiskControls.EnableLocalSyncLogDataCutting");
+ appData->Icb->RegisterSharedControl(compressChunksControls.back(), "VDiskControls.EnableSyncLogChunkCompressionHDD");
+ edges.push_back(env.Runtime->AllocateEdgeActor(nodeId));
+ }
+
+ for (ui32 i = 0; i < groupSize; ++i) {
+ env.Runtime->WrapInActorContext(edges[i], [&] {
+ SendToBSProxy(edges[i], groupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
+ });
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(edges[i], false);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ }
+
+ auto writeBlob = [&](ui32 nodeId, ui32 blobSize) {
+ TLogoBlobID blobId(tabletId, gen, step, channel, blobSize, ++cookie);
+ totalSize += blobSize;
+ TString data = MakeData(blobSize);
+
+ const TActorId& sender = edges[nodeId - 1];
+ env.Runtime->WrapInActorContext(sender, [&] () {
+ SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvPut(blobId, std::move(data), TInstant::Max()));
+ });
+ };
+
+ env.Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr<IEventHandle>& ev) {
+ switch(ev->Type) {
+ case TEvBlobStorage::TEvPutResult::EventType:
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get<TEvBlobStorage::TEvPutResult>()->Status, NKikimrProto::OK);
+ return false;
+ default:
+ return true;
+ }
+ };
+
+ while (totalSize < 16_MB) {
+ writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
+ }
+ env.Sim(TDuration::Minutes(5));
+
+ for (ui32 i = 0; i < groupSize; ++i) {
+ cutLocalSyncLogControls[i] = !!(mask & (1 << i));
+ compressChunksControls[i] = compressChunks;
+ }
+
+ while (totalSize < 32_MB) {
+ writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
+ }
+
+ env.Sim(TDuration::Minutes(5));
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestSyncLogCuttingMirror3dc) {
+ TestCutting(TBlobStorageGroupType::ErasureMirror3dc);
+ }
+
+ Y_UNIT_TEST(TestSyncLogCuttingMirror3of4) {
+ TestCutting(TBlobStorageGroupType::ErasureMirror3of4);
+ }
+
+ Y_UNIT_TEST(TestSyncLogCuttingBlock4Plus2) {
+ TestCutting(TBlobStorageGroupType::Erasure4Plus2Block);
+ }
+
Y_UNIT_TEST(SyncWhenDiskGetsDown) {
return; // re-enable when protocol issue is resolved
diff --git a/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h b/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h
index 5fb44fa60a..163aa45126 100644
--- a/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h
+++ b/ydb/core/blobstorage/ut_blobstorage/ut_helpers.h
@@ -7,6 +7,11 @@ namespace NKikimr {
TString MakeData(ui32 dataSize);
+template<typename Int1 = ui32, typename Int2 = ui32>
+inline Int1 GenerateRandom(Int1 min, Int2 max) {
+ return min + RandomNumber(max - min);
+}
+
class TInflightActor : public TActorBootstrapped<TInflightActor> {
public:
struct TSettings {
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
index f4a2113a5b..8a0f784f6c 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
@@ -63,6 +63,8 @@ namespace NKikimr {
SyncLogAdvisedIndexedBlockSize = ui32(1) << ui32(20); // 1 MB
SyncLogMaxMemAmount = ui64(64) << ui64(20); // 64 MB
+ MaxSyncLogChunkSize = ui32(16) << ui32(10); // 32 Kb
+
ReplTimeInterval = TDuration::Seconds(60); // 60 seconds
ReplRequestTimeout = TDuration::Seconds(10); // 10 seconds
ReplPlanQuantum = TDuration::MilliSeconds(100); // 100 ms
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_config.h b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
index a2335182c4..35a9ec4e68 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_config.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_config.h
@@ -154,6 +154,11 @@ namespace NKikimr {
ui32 SyncLogAdvisedIndexedBlockSize;
ui64 SyncLogMaxMemAmount;
+ TControlWrapper EnableLocalSyncLogDataCutting;
+ TControlWrapper EnableSyncLogChunkCompression;
+ TControlWrapper MaxSyncLogChunksInFlight;
+ ui32 MaxSyncLogChunkSize;
+
///////////// REPL SETTINGS /////////////////////////
TDuration ReplTimeInterval;
TDuration ReplRequestTimeout;
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
index 393c966eea..fedaca0421 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
@@ -513,14 +513,11 @@ namespace NKikimr {
///////////////// SYNC //////////////////////////////////////////////////////
TLsnSeg THull::AllocateLsnForSyncDataCmd(const TString &data) {
- // count number of elements
- ui32 counter = 0;
- auto count = [&counter] (const void *) { counter++; };
- // do job - count all elements
- NSyncLog::TFragmentReader(data).ForEach(count, count, count, count);
+ NSyncLog::TFragmentReader fragment(data);
// allocate LsnSeg; we reserve a diapason of lsns since we put multiple records
- ui64 lsnAdvance = counter;
+ std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords();
+ ui64 lsnAdvance = records.size();
Y_ABORT_UNLESS(lsnAdvance > 0);
auto seg = Fields->LsnMngr->AllocLsnForHull(lsnAdvance);
@@ -536,7 +533,9 @@ namespace NKikimr {
curLsn++;
};
// do job - update blocks cache
- NSyncLog::TFragmentReader(data).ForEach(otherHandler, blockHandler, otherHandler, blockHandlerV2);
+ for (const NSyncLog::TRecordHdr* rec : records) {
+ NSyncLog::HandleRecordHdr(rec, otherHandler, blockHandler, otherHandler, blockHandlerV2);
+ }
// check that all records are applied
Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1);
diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp
index ee7022da11..fb7f0875a3 100644
--- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp
+++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp
@@ -429,10 +429,9 @@ namespace NKikimr {
void ApplySyncDataByRecord(const TActorContext &ctx, ui64 recordLsn) {
// count number of records
- ui64 recsNum = 0;
- auto count = [&recsNum] (const void *) { recsNum++; };
NSyncLog::TFragmentReader fragment(LocalSyncDataMsg.Data);
- fragment.ForEach(count, count, count, count);
+ std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords();
+ ui64 recsNum = records.size();
// calculate lsn
Y_DEBUG_ABORT_UNLESS(recordLsn >= recsNum, "recordLsn# %" PRIu64 " recsNum# %" PRIu64,
@@ -465,7 +464,9 @@ namespace NKikimr {
};
// apply local sync data
- fragment.ForEach(blobHandler, blockHandler, barrierHandler, blockHandlerV2);
+ for (const NSyncLog::TRecordHdr* rec : records) {
+ NSyncLog::HandleRecordHdr(rec, blobHandler, blockHandler, barrierHandler, blockHandlerV2);
+ }
}
void PutLogoBlobsBatchToHull(
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp
index 2173cf4c64..ba013028ed 100644
--- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp
@@ -1,5 +1,6 @@
#include "blobstorage_syncer_localwriter.h"
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h>
+#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h>
namespace NKikimr {
@@ -186,5 +187,118 @@ namespace NKikimr {
return new TLocalSyncDataExtractorActor(vctx, skeletonId, parentId, std::move(ev));
}
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+ // TLocalSyncDataCutterActor -- actor extracts data from TEvLocalSyncData, cuts it into
+ // smaller chunks and sends in multiple messages to Skeleton
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+ class TLocalSyncDataCutterActor : public TActorBootstrapped<TLocalSyncDataCutterActor> {
+ TIntrusivePtr<TVDiskConfig> VConfig;
+ TIntrusivePtr<TVDiskContext> VCtx;
+ TActorId SkeletonId;
+ TActorId ParentId;
+ std::unique_ptr<TEvLocalSyncData> Ev;
+ std::vector<TString> Chunks;
+
+ ui32 ChunksInFlight = 0;
+ bool CompressChunks;
+ ui32 MaxChunksInFlight;
+ ui32 MaxChunksSize;
+
+ public:
+ void Bootstrap(const TActorContext&) {
+ THPTimer timer;
+ std::unique_ptr<NSyncLog::TNaiveFragmentWriter> fragmentWriter;
+
+ if (CompressChunks) {
+ fragmentWriter.reset(new NSyncLog::TLz4FragmentWriter);
+ } else {
+ fragmentWriter.reset(new NSyncLog::TNaiveFragmentWriter);
+ }
+
+ auto addChunk = [&]() {
+ if (fragmentWriter->GetSize()) {
+ TString chunk;
+ fragmentWriter->Finish(&chunk);
+ Chunks.emplace_back(std::move(chunk));
+ fragmentWriter->Clear();
+ }
+ };
+
+ NSyncLog::TFragmentReader fragmentReader(Ev->Data);
+ std::vector<const NSyncLog::TRecordHdr*> records = fragmentReader.ListRecords();
+ for (const NSyncLog::TRecordHdr* rec : records) {
+ if (fragmentWriter->GetSize() + rec->GetSize() > MaxChunksSize) {
+ addChunk();
+ }
+ fragmentWriter->Push(rec, rec->GetSize());
+ }
+ addChunk();
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, VCtx->VDiskLogPrefix
+ << "TLocalSyncDataCutterActor: VDiskId# " << Ev->VDiskID.ToString()
+ << " dataSize# " << Ev->Data.size()
+ << " duration# " << TDuration::Seconds(timer.Passed()));
+
+ Become(&TThis::StateFunc);
+ SendChunks();
+ }
+
+ void Finish(const NKikimrProto::EReplyStatus& status) {
+ Send(ParentId, new TEvLocalSyncDataResult(status, TAppData::TimeProvider->Now(), nullptr, nullptr));
+ PassAway();
+ }
+
+ void Handle(const TEvLocalSyncDataResult::TPtr& ev) {
+ if (ev->Get()->Status == NKikimrProto::OK) {
+ --ChunksInFlight;
+ if (Chunks.empty() && ChunksInFlight == 0) {
+ Finish(NKikimrProto::OK);
+ } else {
+ SendChunks();
+ }
+ } else {
+ Finish(ev->Get()->Status);
+ }
+ }
+
+ void SendChunks() {
+ while (ChunksInFlight < MaxChunksInFlight && !Chunks.empty()) {
+ Send(SkeletonId, new TEvLocalSyncData(Ev->VDiskID, Ev->SyncState, std::move(Chunks.back())));
+ Chunks.pop_back();
+ ++ChunksInFlight;
+ }
+ }
+
+ public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::VDISK_LOCALSYNCDATA_CUTTER;
+ }
+
+ TLocalSyncDataCutterActor(
+ const TIntrusivePtr<TVDiskConfig>& vconfig,
+ const TIntrusivePtr<TVDiskContext>& vctx,
+ const TActorId& skeletonId,
+ const TActorId& parentId,
+ std::unique_ptr<TEvLocalSyncData> ev)
+ : VCtx(vctx)
+ , SkeletonId(skeletonId)
+ , ParentId(parentId)
+ , Ev(std::move(ev))
+ , CompressChunks(vconfig->MaxSyncLogChunksInFlight)
+ , MaxChunksInFlight(vconfig->MaxSyncLogChunksInFlight)
+ , MaxChunksSize(vconfig->MaxSyncLogChunkSize)
+ {}
+
+ STRICT_STFUNC(StateFunc, {
+ hFunc(TEvLocalSyncDataResult, Handle);
+ })
+
+ };
+
+ IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx,
+ const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev) {
+ return new TLocalSyncDataCutterActor(vconfig, vctx, skeletonId, parentId, std::move(ev));
+ }
+
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h
index 1bb55b311f..779acfd31a 100644
--- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h
@@ -69,4 +69,10 @@ namespace NKikimr {
IActor *CreateLocalSyncDataExtractor(const TIntrusivePtr<TVDiskContext> &vctx, const TActorId &skeletonId,
const TActorId &parentId, std::unique_ptr<TEvLocalSyncData> ev);
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+ // CreateLocalSyncDataCutter
+ ///////////////////////////////////////////////////////////////////////////////////////////////
+ IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx,
+ const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev);
+
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp b/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp
index 29ec0237ed..7378a171fa 100644
--- a/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp
+++ b/ydb/core/blobstorage/vdisk/syncer/syncer_job_task.cpp
@@ -193,14 +193,21 @@ namespace NKikimr {
auto syncState = GetCurrent().SyncState;
auto msg = std::make_unique<TEvLocalSyncData>(vdisk, syncState, data);
+ if (Ctx->SyncerCtx->Config->EnableLocalSyncLogDataCutting) {
+ std::unique_ptr<IActor> actor(CreateLocalSyncDataCutter(Ctx->SyncerCtx->Config,
+ Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, parentId, std::move(msg)));
+ return TSjOutcome::Actor(std::move(actor), true);
+ } else {
+
#ifdef UNPACK_LOCALSYNCDATA
- std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId,
- parentId, std::move(msg)));
- return TSjOutcome::Actor(actor.Release(), true);
+ std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId,
+ parentId, std::move(msg)));
+ return TSjOutcome::Actor(actor.Release(), true);
#else
- Y_UNUSED(parentId);
- return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg));
+ Y_UNUSED(parentId);
+ return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg));
#endif
+ }
} else {
if (!EndOfStream) {
Ctx->SyncerCtx->VCtx->Logger(NActors::NLog::PRI_ERROR, BS_SYNCER,
@@ -331,14 +338,22 @@ namespace NKikimr {
// SyncState position.
Phase = EWaitLocal;
auto msg = std::make_unique<TEvLocalSyncData>(vdisk, OldSyncState, data);
+
+ if (Ctx->SyncerCtx->Config->EnableLocalSyncLogDataCutting) {
+ std::unique_ptr<IActor> actor(CreateLocalSyncDataCutter(Ctx->SyncerCtx->Config,
+ Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId, parentId, std::move(msg)));
+ return TSjOutcome::Actor(std::move(actor), true);
+ } else {
+ auto msg = std::make_unique<TEvLocalSyncData>(vdisk, OldSyncState, data);
#ifdef UNPACK_LOCALSYNCDATA
- std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId,
- parentId, std::move(msg)));
- return TSjOutcome::Actor(actor.Release(), true);
+ std::unique_ptr<IActor> actor(CreateLocalSyncDataExtractor(Ctx->SyncerCtx->VCtx, Ctx->SyncerCtx->SkeletonId,
+ parentId, std::move(msg)));
+ return TSjOutcome::Actor(actor.Release(), true);
#else
- Y_UNUSED(parentId);
- return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg));
+ Y_UNUSED(parentId);
+ return TSjOutcome::Event(Ctx->SyncerCtx->SkeletonId, std::move(msg));
#endif
+ }
} else {
if (!EndOfStream) {
Ctx->SyncerCtx->VCtx->Logger(NActors::NLog::PRI_ERROR, BS_SYNCER,
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp
index e356f0a205..d6a86cff3d 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.cpp
@@ -11,29 +11,13 @@ namespace NKikimr {
ForEach(Data, fblob, fblock, fbar, fblock2);
}
- void TNaiveFragmentReader::ForEach(const TString &d, TReadLogoBlobRec fblob, TReadBlockRec fblock,
- TReadBarrierRec fbar, TReadBlockRecV2 fblock2) {
- const TRecordHdr *begin = (const TRecordHdr *)(d.data());
- const TRecordHdr *end = (const TRecordHdr *)(d.data() + d.size());
-
- for (const TRecordHdr *it = begin; it < end; it = it->Next()) {
- switch (it->RecType) {
- case TRecordHdr::RecLogoBlob:
- fblob(it->GetLogoBlob());
- break;
- case TRecordHdr::RecBlock:
- fblock(it->GetBlock());
- break;
- case TRecordHdr::RecBarrier:
- fbar(it->GetBarrier());
- break;
- case TRecordHdr::RecBlockV2:
- fblock2(it->GetBlockV2());
- break;
- default:
- Y_ABORT("Unknown RecType: %s", it->ToString().data());
- }
- }
+ std::vector<const TRecordHdr*> TNaiveFragmentReader::ListRecords() {
+ std::vector<const TRecordHdr*> records = {};
+ TWriteRecordToList writeToList{records};
+
+ ForEach(Data, writeToList, writeToList, writeToList, writeToList);
+
+ return std::move(records);
}
bool TNaiveFragmentReader::Check(TString &errorString) {
@@ -62,55 +46,30 @@ namespace NKikimr {
}
}
+ std::vector<const TRecordHdr*> TLz4FragmentReader::ListRecords() {
+ Decompress();
+ std::vector<const TRecordHdr*> records = {};
+ TWriteRecordToList writeToList{records};
+
+ TNaiveFragmentReader::ForEach(Uncompressed, writeToList, writeToList, writeToList, writeToList);
+
+ return std::move(records);
+ }
+
////////////////////////////////////////////////////////////////////////////
// TBaseOrderedFragmentReader
////////////////////////////////////////////////////////////////////////////
- void TBaseOrderedFragmentReader::ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) {
- Decompress();
- using THeapItem = std::tuple<TRecordHdr::ESyncLogRecType, const void*, ui32>;
- auto comp = [](const THeapItem& x, const THeapItem& y) { return std::get<2>(y) < std::get<2>(x); };
+ void TBaseOrderedFragmentReader::ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) {
+ return ForEachImpl(fblob, fblock, fbar, fblock2);
+ }
- TStackVec<THeapItem, 4> heap;
+ std::vector<const TRecordHdr*> TBaseOrderedFragmentReader::ListRecords() {
+ std::vector<const TRecordHdr*> records;
+ TWriteRecordToList writeToList{records};
-#define ADD_HEAP(NAME, TYPE) \
- if (!Records.NAME.empty()) { \
- heap.emplace_back(TRecordHdr::TYPE, &Records.NAME.front(), Records.NAME.front().Counter); \
- }
- ADD_HEAP(LogoBlobs, RecLogoBlob)
- ADD_HEAP(Blocks, RecBlock)
- ADD_HEAP(Barriers, RecBarrier)
- ADD_HEAP(BlocksV2, RecBlockV2)
-
- std::make_heap(heap.begin(), heap.end(), comp);
-
- while (!heap.empty()) {
- std::pop_heap(heap.begin(), heap.end(), comp);
- auto& item = heap.back(); // say thanks to Microsoft compiler for not supporting tuple binding correctly
- auto& type = std::get<0>(item);
- auto& ptr = std::get<1>(item);
- auto& counter = std::get<2>(item);
- switch (type) {
-#define PROCESS(NAME, TYPE, FUNC) \
- case TRecordHdr::TYPE: { \
- using T = std::decay_t<decltype(Records.NAME)>::value_type; \
- const T *item = static_cast<const T*>(ptr); \
- FUNC(item); \
- if (++item != Records.NAME.data() + Records.NAME.size()) { \
- ptr = item; \
- counter = item->Counter; \
- std::push_heap(heap.begin(), heap.end(), comp); \
- } else { \
- heap.pop_back(); \
- } \
- break; \
- }
- PROCESS(LogoBlobs, RecLogoBlob, fblob)
- PROCESS(Blocks, RecBlock, fblock)
- PROCESS(Barriers, RecBarrier, fbar)
- PROCESS(BlocksV2, RecBlockV2, fblock2)
- }
- }
+ ForEachImpl(writeToList, writeToList, writeToList, writeToList);
+ return std::move(records);
}
////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h
index d3c7780e92..c89a656fab 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h
@@ -15,6 +15,35 @@ namespace NKikimr {
using TReadBarrierRec = std::function<void(const TBarrierRec *)>;
using TReadBlockRecV2 = std::function<void(const TBlockRecV2 *)>;
+ struct TWriteRecordToList {
+ void operator()(const void* ptr) {
+ Records.push_back((const TRecordHdr*)((char*)ptr - sizeof(TRecordHdr)));
+ };
+
+ std::vector<const TRecordHdr*>& Records;
+ };
+
+
+ template <class TLogoBlobCallback, class TBlockCallback, class TReadBarrierCallback, class TBlockV2Callback>
+ void HandleRecordHdr(const TRecordHdr* rec, TLogoBlobCallback fblob, TBlockCallback fblock, TReadBarrierCallback fbar,
+ TBlockV2Callback fblock2) {
+ switch (rec->RecType) {
+ case TRecordHdr::RecLogoBlob:
+ fblob(rec->GetLogoBlob());
+ break;
+ case TRecordHdr::RecBlock:
+ fblock(rec->GetBlock());
+ break;
+ case TRecordHdr::RecBarrier:
+ fbar(rec->GetBarrier());
+ break;
+ case TRecordHdr::RecBlockV2:
+ fblock2(rec->GetBlockV2());
+ break;
+ default:
+ Y_ABORT("Unknown RecType: %s", rec->ToString().data());
+ }
+ }
////////////////////////////////////////////////////////////////////////////
// IFragmentReader
@@ -23,6 +52,7 @@ namespace NKikimr {
public:
virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) = 0;
virtual bool Check(TString &errorString) = 0;
+ virtual std::vector<const TRecordHdr*> ListRecords() = 0;
virtual ~IFragmentReader() {}
};
@@ -38,10 +68,21 @@ namespace NKikimr {
virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) override;
virtual bool Check(TString &errorString) override;
+ virtual std::vector<const TRecordHdr*> ListRecords() override;
protected:
const TString &Data;
- void ForEach(const TString &d, TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2);
+
+ template <class TLogoBlobCallback, class TBlockCallback, class TReadBarrierCallback, class TBlockV2Callback>
+ void ForEach(const TString &d, TLogoBlobCallback fblob, TBlockCallback fblock, TReadBarrierCallback fbar,
+ TBlockV2Callback fblock2) {
+ const TRecordHdr* begin = (const TRecordHdr*)(d.data());
+ const TRecordHdr* end = (const TRecordHdr*)(d.data() + d.size());
+
+ for (const TRecordHdr* it = begin; it < end; it = it->Next()) {
+ HandleRecordHdr(it, fblob, fblock, fbar, fblock2);
+ }
+ }
};
@@ -57,6 +98,7 @@ namespace NKikimr {
virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) override;
virtual bool Check(TString &errorString) override;
+ virtual std::vector<const TRecordHdr*> ListRecords() override;
private:
mutable TString Uncompressed;
@@ -69,9 +111,60 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
class TBaseOrderedFragmentReader : public IFragmentReader {
public:
+ virtual std::vector<const TRecordHdr*> ListRecords() override;
virtual void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) override;
protected:
+ template <class TLogoBlobCallback, class TBlockCallback, class TReadBarrierCallback, class TBlockV2Callback>
+ void ForEachImpl(TLogoBlobCallback fblob, TBlockCallback fblock, TReadBarrierCallback fbar,
+ TBlockV2Callback
+ fblock2) {
+ Decompress();
+
+ using THeapItem = std::tuple<TRecordHdr::ESyncLogRecType, const void*, ui32>;
+ auto comp = [](const THeapItem& x, const THeapItem& y) { return std::get<2>(y) < std::get<2>(x); };
+
+ TStackVec<THeapItem, 4> heap;
+
+ #define ADD_HEAP(NAME, TYPE) \
+ if (!Records.NAME.empty()) { \
+ heap.emplace_back(TRecordHdr::TYPE, &Records.NAME.front(), Records.NAME.front().Counter); \
+ }
+ ADD_HEAP(LogoBlobs, RecLogoBlob)
+ ADD_HEAP(Blocks, RecBlock)
+ ADD_HEAP(Barriers, RecBarrier)
+ ADD_HEAP(BlocksV2, RecBlockV2)
+
+ std::make_heap(heap.begin(), heap.end(), comp);
+
+ while (!heap.empty()) {
+ std::pop_heap(heap.begin(), heap.end(), comp);
+ auto& item = heap.back(); // say thanks to Microsoft compiler for not supporting tuple binding correctly
+ auto& type = std::get<0>(item);
+ auto& ptr = std::get<1>(item);
+ auto& counter = std::get<2>(item);
+ switch (type) {
+ #define PROCESS(NAME, TYPE, FUNC) \
+ case TRecordHdr::TYPE: { \
+ using T = std::decay_t<decltype(Records.NAME)>::value_type; \
+ const T *item = static_cast<const T*>(ptr); \
+ FUNC(item); \
+ if (++item != Records.NAME.data() + Records.NAME.size()) { \
+ ptr = item; \
+ counter = item->Counter; \
+ std::push_heap(heap.begin(), heap.end(), comp); \
+ } else { \
+ heap.pop_back(); \
+ } \
+ break; \
+ }
+ PROCESS(LogoBlobs, RecLogoBlob, fblob)
+ PROCESS(Blocks, RecBlock, fblock)
+ PROCESS(Barriers, RecBarrier, fbar)
+ PROCESS(BlocksV2, RecBlockV2, fblock2)
+ }
+ }
+ }
TRecordsWithSerial Records;
virtual bool Decompress() = 0;
@@ -123,6 +216,10 @@ namespace NKikimr {
public:
TFragmentReader(const TString &data);
+ std::vector<const TRecordHdr*> ListRecords() {
+ return Impl->ListRecords();
+ }
+
void ForEach(TReadLogoBlobRec fblob, TReadBlockRec fblock, TReadBarrierRec fbar, TReadBlockRecV2 fblock2) {
Impl->ForEach(fblob, fblock, fbar, fblock2);
}
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp
index ab2b6dc161..b3611db53c 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.cpp
@@ -11,6 +11,11 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
// TNaiveFragmentWriter
////////////////////////////////////////////////////////////////////////////
+ void TNaiveFragmentWriter::Clear() {
+ Chain = {{64 << 10}}; // 64 KiB initial storage
+ DataSize = 0;
+ }
+
void TNaiveFragmentWriter::Finish(TString *respData) {
respData->clear();
respData->reserve(DataSize);
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h
index a31582e83d..998c60e060 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h
@@ -18,10 +18,11 @@ namespace NKikimr {
size_t DataSize;
public:
- TNaiveFragmentWriter()
- : Chain({{64 << 10}}) // 64 KiB initial storage
- , DataSize(0)
- {}
+ TNaiveFragmentWriter() {
+ Clear();
+ }
+
+ virtual ~TNaiveFragmentWriter() = default;
size_t GetSize() const {
return DataSize;
@@ -48,7 +49,9 @@ namespace NKikimr {
}
}
- void Finish(TString *respData);
+ void Clear();
+
+ virtual void Finish(TString *respData);
};
////////////////////////////////////////////////////////////////////////////
@@ -60,7 +63,7 @@ namespace NKikimr {
: TNaiveFragmentWriter()
{}
- void Finish(TString *respData);
+ virtual void Finish(TString *respData) override;
};
////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 3f6337385e..17049b6f0f 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1228,12 +1228,41 @@ message TImmediateControlsConfig {
DefaultValue: 8388608 }];
}
+ message TVDiskControls {
+ optional uint64 EnableLocalSyncLogDataCutting = 1 [(ControlOptions) = {
+ Description: "Allow cutting large TEvLocalSyncLogData messages into smaller chunks",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 EnableSyncLogChunkCompressionHDD = 2 [(ControlOptions) = {
+ Description: "Compress SyncLog chunks before writing to log, setting for HDD",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 1 }];
+ optional uint64 EnableSyncLogChunkCompressionSSD = 3 [(ControlOptions) = {
+ Description: "Compress SyncLog chunks before writing to log, setting for SSD",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 MaxSyncLogChunksInFlightHDD = 4 [(ControlOptions) = {
+ Description: "Maximum number of SyncLog chunks written simultaneously to log, setting for HDD",
+ MinValue: 1,
+ MaxValue: 1024,
+ DefaultValue: 10 }];
+ optional uint64 MaxSyncLogChunksInFlightSSD = 5 [(ControlOptions) = {
+ Description: "Maximum number of SyncLog chunks written simultaneously to log, setting for SSD",
+ MinValue: 1,
+ MaxValue: 1024,
+ DefaultValue: 10 }];
+ }
+
optional TDataShardControls DataShardControls = 1;
optional TTxLimitControls TxLimitControls = 2;
optional TCoordinatorControls CoordinatorControls = 3;
optional TSchemeShardControls SchemeShardControls = 4;
optional TTCMallocControls TCMallocControls = 5;
reserved 6;
+ optional TVDiskControls VDiskControls = 7;
};
message TMeteringConfig {
diff --git a/ydb/core/quoter/ut_helpers.cpp b/ydb/core/quoter/ut_helpers.cpp
index ca36d439c9..61b582b76e 100644
--- a/ydb/core/quoter/ut_helpers.cpp
+++ b/ydb/core/quoter/ut_helpers.cpp
@@ -180,7 +180,7 @@ TKesusProxyTestSetup::TKesusProxyTestSetup() {
}
TTestActorRuntime::TEgg MakeEgg() {
- return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr };
+ return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} };
}
void TKesusProxyTestSetup::Start() {
diff --git a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp
index 83189b339d..ec7cd7e473 100644
--- a/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp
+++ b/ydb/core/sys_view/partition_stats/partition_stats_ut.cpp
@@ -13,7 +13,7 @@ Y_UNIT_TEST_SUITE(PartitionStats) {
TTestActorRuntime::TEgg MakeEgg()
{
- return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr };
+ return { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} };
}
void WaitForBootstrap(TTestActorRuntime &runtime) {
diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h
index 75f69ca4e5..b2b53302f1 100644
--- a/ydb/core/tablet_flat/test/libs/exec/runner.h
+++ b/ydb/core/tablet_flat/test/libs/exec/runner.h
@@ -46,7 +46,7 @@ namespace NFake {
auto *types = NTable::NTest::DbgRegistry();
auto *app = new TAppData(0, 0, 0, 0, { }, types, nullptr, nullptr, nullptr);
- Env.Initialize({ app, nullptr, nullptr });
+ Env.Initialize({ app, nullptr, nullptr, {} });
Env.SetDispatchTimeout(DEFAULT_DISPATCH_TIMEOUT);
Env.SetLogPriority(NKikimrServices::FAKE_ENV, NActors::NLog::PRI_INFO);
diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp
index 0bc5dbb9e9..51341a7625 100644
--- a/ydb/core/testlib/actors/test_runtime.cpp
+++ b/ydb/core/testlib/actors/test_runtime.cpp
@@ -154,6 +154,10 @@ namespace NActors {
nodeAppData->GraphConfig = app0->GraphConfig;
nodeAppData->EnableMvccSnapshotWithLegacyDomainRoot = app0->EnableMvccSnapshotWithLegacyDomainRoot;
nodeAppData->IoContextFactory = app0->IoContextFactory;
+ if (nodeIndex < egg.Icb.size()) {
+ nodeAppData->Icb = std::move(egg.Icb[nodeIndex]);
+ nodeAppData->InFlightLimiterRegistry.Reset(new NKikimr::NGRpcService::TInFlightLimiterRegistry(nodeAppData->Icb));
+ }
if (KeyConfigGenerator) {
nodeAppData->KeyConfig = KeyConfigGenerator(nodeIndex);
} else {
diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h
index 4ca29fe1e1..3b627b1e15 100644
--- a/ydb/core/testlib/actors/test_runtime.h
+++ b/ydb/core/testlib/actors/test_runtime.h
@@ -3,6 +3,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/mon/mon.h>
#include <ydb/core/base/memobserver.h>
+#include <ydb/core/control/immediate_control_board_impl.h>
#include <ydb/core/protos/shared_cache.pb.h>
#include <ydb/library/actors/testlib/test_runtime.h>
@@ -47,6 +48,7 @@ namespace NActors {
TAutoPtr<NKikimr::TAppData> App0;
TAutoPtr<NActors::IDestructable> Opaque;
TKeyConfigGenerator KeyConfigGenerator;
+ std::vector<TIntrusivePtr<NKikimr::TControlBoard>> Icb;
};
TTestActorRuntime(THeSingleSystemEnv d);
diff --git a/ydb/core/testlib/actors/test_runtime_ut.cpp b/ydb/core/testlib/actors/test_runtime_ut.cpp
index b2c7ec58cf..d649df72fc 100644
--- a/ydb/core/testlib/actors/test_runtime_ut.cpp
+++ b/ydb/core/testlib/actors/test_runtime_ut.cpp
@@ -12,7 +12,7 @@ Y_UNIT_TEST_SUITE(TActorTest) {
TTestActorRuntime::TEgg MakeEgg()
{
return
- { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr };
+ { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} };
}
Y_UNIT_TEST(TestHandleEvent) {
diff --git a/ydb/core/testlib/basics/appdata.cpp b/ydb/core/testlib/basics/appdata.cpp
index b27ca91cac..b4c8ff3868 100644
--- a/ydb/core/testlib/basics/appdata.cpp
+++ b/ydb/core/testlib/basics/appdata.cpp
@@ -71,7 +71,7 @@ namespace NKikimr {
NKikimrProto::TKeyConfig();
};
- return { app, Mine.Release(), keyGenerator};
+ return { app, Mine.Release(), keyGenerator, std::move(Icb) };
}
void TAppPrepare::AddDomain(TDomainsInfo::TDomain* domain)
@@ -200,4 +200,11 @@ namespace NKikimr {
{
AwsCompatibilityConfig.SetAwsRegion(value);
}
+
+ void TAppPrepare::InitIcb(ui32 numNodes)
+ {
+ for (ui32 i = 0; i < numNodes; ++i) {
+ Icb.emplace_back(new TControlBoard);
+ }
+ }
}
diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h
index c4a43bb444..bf2aaf125b 100644
--- a/ydb/core/testlib/basics/appdata.h
+++ b/ydb/core/testlib/basics/appdata.h
@@ -87,6 +87,7 @@ namespace NKikimr {
void SetEnablePqBilling(std::optional<bool> value);
void SetEnableDbCounters(bool value);
void SetAwsRegion(const TString& value);
+ void InitIcb(ui32 numNodes);
TIntrusivePtr<TChannelProfiles> Channels;
NKikimrBlobStorage::TNodeWardenServiceSet BSConf;
@@ -104,6 +105,8 @@ namespace NKikimr {
NKikimrConfig::TAwsCompatibilityConfig AwsCompatibilityConfig;
NKikimrConfig::TS3ProxyResolverConfig S3ProxyResolverConfig;
NKikimrConfig::TGraphConfig GraphConfig;
+ NKikimrConfig::TImmediateControlsConfig ImmediateControlsConfig;
+ std::vector<TIntrusivePtr<NKikimr::TControlBoard>> Icb;
private:
TAutoPtr<TMine> Mine;
diff --git a/ydb/core/testlib/basics/helpers.h b/ydb/core/testlib/basics/helpers.h
index 9545275794..3a77ce61fa 100644
--- a/ydb/core/testlib/basics/helpers.h
+++ b/ydb/core/testlib/basics/helpers.h
@@ -53,6 +53,8 @@ namespace NFake {
void SetupSchemeCache(TTestActorRuntime& runtime, ui32 nodeIndex, const TString& root);
void SetupQuoterService(TTestActorRuntime& runtime, ui32 nodeIndex);
void SetupSysViewService(TTestActorRuntime& runtime, ui32 nodeIndex);
+ void SetupIcb(TTestActorRuntime& runtime, ui32 nodeIndex, const NKikimrConfig::TImmediateControlsConfig& config,
+ const TIntrusivePtr<NKikimr::TControlBoard>& icb);
// StateStorage, NodeWarden, TabletResolver, ResourceBroker, SharedPageCache
void SetupBasicServices(TTestActorRuntime &runtime, TAppPrepare &app, bool mockDisk = false,
diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp
index 9d98ea164a..a6ec2f394d 100644
--- a/ydb/core/testlib/basics/services.cpp
+++ b/ydb/core/testlib/basics/services.cpp
@@ -9,6 +9,8 @@
#include <ydb/core/base/statestorage_impl.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/tablet_resolver.h>
+#include <ydb/core/cms/console/immediate_controls_configurator.h>
+#include <ydb/core/control/immediate_control_board_actor.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.h>
#include <ydb/core/quoter/quoter_service.h>
@@ -40,6 +42,20 @@ namespace NPDisk {
extern const ui64 YdbDefaultPDiskSequence = 0x7e5700007e570000;
}
+ void SetupIcb(TTestActorRuntime& runtime, ui32 nodeIndex, const NKikimrConfig::TImmediateControlsConfig& config,
+ const TIntrusivePtr<NKikimr::TControlBoard>& icb)
+ {
+ runtime.AddLocalService(MakeIcbId(runtime.GetNodeId(nodeIndex)),
+ TActorSetupCmd(CreateImmediateControlActor(icb, runtime.GetDynamicCounters(nodeIndex)),
+ TMailboxType::ReadAsFilled, 0),
+ nodeIndex);
+
+ runtime.AddLocalService(TActorId{},
+ TActorSetupCmd(NConsole::CreateImmediateControlsConfigurator(icb, config),
+ TMailboxType::ReadAsFilled, 0),
+ nodeIndex);
+ }
+
void SetupBSNodeWarden(TTestActorRuntime& runtime, ui32 nodeIndex, TIntrusivePtr<TNodeWardenConfig> nodeWardenConfig)
{
runtime.AddLocalService(MakeBlobStorageNodeWardenID(runtime.GetNodeId(nodeIndex)),
@@ -325,12 +341,17 @@ namespace NPDisk {
app.AddHive(0);
}
+ while (app.Icb.size() < runtime.GetNodeCount()) {
+ app.Icb.emplace_back(new TControlBoard);
+ }
+
for (ui32 nodeIndex = 0; nodeIndex < runtime.GetNodeCount(); ++nodeIndex) {
SetupStateStorageGroups(runtime, nodeIndex);
NKikimrProto::TKeyConfig keyConfig;
if (const auto it = app.Keys.find(nodeIndex); it != app.Keys.end()) {
keyConfig = it->second;
}
+ SetupIcb(runtime, nodeIndex, app.ImmediateControlsConfig, app.Icb[nodeIndex]);
SetupBSNodeWarden(runtime, nodeIndex, disk.MakeWardenConf(*app.Domains, keyConfig));
SetupTabletResolver(runtime, nodeIndex);
diff --git a/ydb/core/testlib/tenant_runtime.cpp b/ydb/core/testlib/tenant_runtime.cpp
index 64f2af721a..994928e4cf 100644
--- a/ydb/core/testlib/tenant_runtime.cpp
+++ b/ydb/core/testlib/tenant_runtime.cpp
@@ -820,6 +820,7 @@ void TTenantTestRuntime::Setup(bool createTenantPools)
TAppPrepare app;
app.FeatureFlags = Extension.GetFeatureFlags();
+ app.ImmediateControlsConfig = Extension.GetImmediateControlsConfig();
app.ClearDomainsAndHive();
ui32 planResolution = 500;
@@ -854,7 +855,9 @@ void TTenantTestRuntime::Setup(bool createTenantPools)
app.AddHive(Config.HiveId);
}
- for (size_t i = 0; i< Config.Nodes.size(); ++i) {
+ app.InitIcb(Config.Nodes.size());
+
+ for (size_t i = 0; i < Config.Nodes.size(); ++i) {
AddLocalService(NNodeWhiteboard::MakeNodeWhiteboardServiceId(GetNodeId(i)),
TActorSetupCmd(new TFakeNodeWhiteboardService, TMailboxType::Simple, 0), i);
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 85c69635ef..4093006a74 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -185,6 +185,8 @@ namespace Tests {
app.SetAwsRegion(Settings->AwsRegion);
app.CompactionConfig = Settings->CompactionConfig;
app.FeatureFlags = Settings->FeatureFlags;
+ app.ImmediateControlsConfig = Settings->Controls;
+ app.InitIcb(StaticNodes() + DynamicNodes());
Runtime = MakeHolder<TTestBasicRuntime>(StaticNodes() + DynamicNodes(), Settings->UseRealThreads);
@@ -268,14 +270,9 @@ namespace Tests {
//
for (ui32 nodeIdx = 0; nodeIdx < StaticNodes(); ++nodeIdx) {
SetupDomainLocalService(nodeIdx);
- SetupConfigurators(nodeIdx);
SetupProxies(nodeIdx);
}
- for (ui32 nodeIdx = StaticNodes(); nodeIdx < StaticNodes() + DynamicNodes(); ++nodeIdx) {
- SetupConfigurators(nodeIdx);
- }
-
CreateBootstrapTablets();
SetupStorage();
}
diff --git a/ydb/library/ncloud/impl/access_service_ut.cpp b/ydb/library/ncloud/impl/access_service_ut.cpp
index 3ac708f071..fd8f15c085 100644
--- a/ydb/library/ncloud/impl/access_service_ut.cpp
+++ b/ydb/library/ncloud/impl/access_service_ut.cpp
@@ -34,7 +34,7 @@ struct TTestSetup : public NUnitTest::TBaseFixture {
TTestActorRuntime::TEgg MakeEgg() {
return
- { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr };
+ { new TAppData(0, 0, 0, 0, { }, nullptr, nullptr, nullptr, nullptr), nullptr, nullptr, {} };
}
void Init() {
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 6e3dfaa488..2a70e16707 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -1032,5 +1032,6 @@ message TActivity {
BACKUP_CONTROLLER_TABLET = 635;
REPLICATION_CONTROLLER_SECRET_RESOLVER = 636;
REPLICATION_CONTROLLER_DST_ALTERER = 637;
+ VDISK_LOCALSYNCDATA_CUTTER = 638;
};
};