summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvporyadke <[email protected]>2024-03-28 15:31:15 +0100
committerGitHub <[email protected]>2024-03-28 15:31:15 +0100
commitd127a0339c7b3518cbe4d5f52cd309dbc57ba2da (patch)
treefc550383beb3cf6e7d86ea5dbd2eaa1f3e4f491f
parent3ba110649e625232c8a530145783f1cfdc22c9fc (diff)
do schema snapshots after a reassign (#3187)
-rw-r--r--ydb/core/protos/scheme_log.proto3
-rw-r--r--ydb/core/tablet_flat/flat_boot_alter.h11
-rw-r--r--ydb/core/tablet_flat/flat_boot_snap.h8
-rw-r--r--ydb/core/tablet_flat/flat_dbase_scheme.cpp9
-rw-r--r--ydb/core/tablet_flat/flat_dbase_scheme.h1
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp12
-rw-r--r--ydb/core/tablet_flat/flat_executor_bootlogic.h1
-rw-r--r--ydb/core/tablet_flat/flat_executor_ut.cpp54
-rw-r--r--ydb/core/tablet_flat/logic_alter_main.h12
-rw-r--r--ydb/core/tablet_flat/test/libs/exec/helper.h2
-rw-r--r--ydb/core/tablet_flat/test/libs/exec/runner.h8
11 files changed, 113 insertions, 8 deletions
diff --git a/ydb/core/protos/scheme_log.proto b/ydb/core/protos/scheme_log.proto
index 70d313e2f7d..4635167fcb0 100644
--- a/ydb/core/protos/scheme_log.proto
+++ b/ydb/core/protos/scheme_log.proto
@@ -73,4 +73,7 @@ message TAlterRecord {
message TSchemeChanges {
repeated TAlterRecord Delta = 1;
+ // If the flag bellow is set, delete all scheme blobs before this
+ // Note that an old version will ignore the flag and will not delete anything
+ optional bool Rewrite = 2 [default = false];
};
diff --git a/ydb/core/tablet_flat/flat_boot_alter.h b/ydb/core/tablet_flat/flat_boot_alter.h
index 2d058360d9b..7eaf6f95deb 100644
--- a/ydb/core/tablet_flat/flat_boot_alter.h
+++ b/ydb/core/tablet_flat/flat_boot_alter.h
@@ -64,24 +64,31 @@ namespace NBoot {
void Apply(const NPageCollection::TLargeGlobId &largeGlobId, TArrayRef<const char> body) noexcept
{
+ bool rewrite = false;
if (body) {
TProtoBox<NTable::TSchemeChanges> alter(body);
NTable::TSchemeModifier apply(*Back->Scheme);
auto changed = apply.Apply(alter);
+ rewrite = alter.GetRewrite();
if (auto logl = Env->Logger()->Log(ELnLev::Debug)) {
logl
<< NFmt::Do(*Back) << " alter log "
<< NFmt::TStamp(NTable::TTxStamp(largeGlobId.Lead).Raw)
<< ", " << (changed ? "update" : "noop")
- << " affects " << NFmt::Arr(apply.Affects);
+ << " affects " << NFmt::Arr(apply.Affects)
+ << ", is " << (rewrite ? "" : "not a ") << "rewrite";
}
}
- if (auto *logic = Logic->Result().Alter.Get())
+ if (auto *logic = Logic->Result().Alter.Get()) {
+ if (rewrite) {
+ logic->Clear();
+ }
logic->RestoreLog(largeGlobId);
+ }
}
private:
diff --git a/ydb/core/tablet_flat/flat_boot_snap.h b/ydb/core/tablet_flat/flat_boot_snap.h
index 063d01a1032..fc4c42b2c9d 100644
--- a/ydb/core/tablet_flat/flat_boot_snap.h
+++ b/ydb/core/tablet_flat/flat_boot_snap.h
@@ -142,8 +142,14 @@ namespace NBoot {
blobs.reserve(Proto.SchemeInfoBodiesSize());
- for (const auto &one : Proto.GetSchemeInfoBodies())
+ for (const auto &one : Proto.GetSchemeInfoBodies()) {
blobs.emplace_back(LogoBlobIDFromLogoBlobID(one));
+ const auto& blob = blobs.back();
+ const auto* channel = Logic->Info->ChannelInfo(blob.Channel());
+ if (channel && blob.Generation() < channel->LatestEntry()->FromGeneration) {
+ Logic->Result().ShouldSnapshotScheme = true;
+ }
+ }
NPageCollection::TGroupBlobsByCookie chop(blobs);
diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.cpp b/ydb/core/tablet_flat/flat_dbase_scheme.cpp
index 548bdf98848..b38f11a16a9 100644
--- a/ydb/core/tablet_flat/flat_dbase_scheme.cpp
+++ b/ydb/core/tablet_flat/flat_dbase_scheme.cpp
@@ -58,7 +58,6 @@ TAutoPtr<TSchemeChanges> TScheme::GetSnapshot() const {
delta.SetExecutorLogFlushPeriod(Executor.LogFlushPeriod);
delta.SetExecutorResourceProfile(Executor.ResourceProfile);
delta.SetExecutorFastLogPolicy(Executor.LogFastTactic);
-
return delta.Flush();
}
@@ -316,6 +315,12 @@ TAlter& TAlter::SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 max
return ApplyLastRecord();
}
+TAlter& TAlter::SetRewrite()
+{
+ Log.SetRewrite(true);
+ return *this;
+}
+
TAlter::operator bool() const noexcept
{
return Log.DeltaSize() > 0;
@@ -324,7 +329,7 @@ TAlter::operator bool() const noexcept
TAutoPtr<TSchemeChanges> TAlter::Flush()
{
TAutoPtr<TSchemeChanges> log(new TSchemeChanges);
- log->MutableDelta()->Swap(Log.MutableDelta());
+ log->Swap(&Log);
return log;
}
diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.h b/ydb/core/tablet_flat/flat_dbase_scheme.h
index 668fe9ecbae..c37f9574c4e 100644
--- a/ydb/core/tablet_flat/flat_dbase_scheme.h
+++ b/ydb/core/tablet_flat/flat_dbase_scheme.h
@@ -259,6 +259,7 @@ public:
TAlter& SetByKeyFilter(ui32 tableId, bool enabled);
TAlter& SetColdBorrow(ui32 tableId, bool enabled);
TAlter& SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 maxBytes);
+ TAlter& SetRewrite();
TAutoPtr<TSchemeChanges> Flush();
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index e7d4d4e9e8c..9eb6976fec7 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -486,6 +486,18 @@ void TExecutor::Active(const TActorContext &ctx) {
MakeLogSnapshot();
+ if (loadedState->ShouldSnapshotScheme) {
+ TTxStamp stamp = Stamp();
+ auto alter = Database->GetScheme().GetSnapshot();
+ alter->SetRewrite(true);
+ auto change = alter->SerializeAsString();
+ Database->RollUp(stamp, change, {}, {});
+ auto commit = CommitManager->Begin(true, ECommit::Misc, {});
+ LogicAlter->Clear();
+ LogicAlter->WriteLog(*commit, std::move(change));
+ CommitManager->Commit(commit);
+ }
+
if (auto error = CheckBorrowConsistency()) {
if (auto logl = Logger->Log(ELnLev::Crit))
logl << NFmt::Do(*this) << " Borrow consistency failed: " << error;
diff --git a/ydb/core/tablet_flat/flat_executor_bootlogic.h b/ydb/core/tablet_flat/flat_executor_bootlogic.h
index 56084c36b0c..bbc94ebe397 100644
--- a/ydb/core/tablet_flat/flat_executor_bootlogic.h
+++ b/ydb/core/tablet_flat/flat_executor_bootlogic.h
@@ -37,6 +37,7 @@ namespace NBoot {
THashMap<ui32, NTable::TRowVersionRanges> RemovedRowVersions;
TVector<TIntrusivePtr<TPrivatePageCache::TInfo>> PageCaches;
+ bool ShouldSnapshotScheme = false;
};
}
diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp
index 4c78841a331..08299bec9de 100644
--- a/ydb/core/tablet_flat/flat_executor_ut.cpp
+++ b/ydb/core/tablet_flat/flat_executor_ut.cpp
@@ -596,6 +596,60 @@ Y_UNIT_TEST_SUITE(TFlatTableCompactionScan) {
env->GrabEdgeEventRethrow<TEvTestFlatTablet::TEvScanFinished>(handle);
env.SendSync(new TEvents::TEvPoison, false, true);
}
+
+ Y_UNIT_TEST(TestCompactionScanWIthReassignAndReboots) {
+ TMyEnvBase env;
+ TRowsModel data;
+
+ env->SetLogPriority(NKikimrServices::TABLET_FLATBOOT, NActors::NLog::PRI_DEBUG);
+
+ env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
+ return new TTestFlatTablet(env.Edge, tablet, info);
+ });
+
+ env.WaitForWakeUp();
+
+ TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy();
+ policy->InMemSizeToSnapshot = 40 * 1024 *1024;
+ policy->InMemStepsToSnapshot = 10;
+ policy->InMemForceStepsToSnapshot = 10;
+ policy->InMemForceSizeToSnapshot = 64 * 1024 * 1024;
+ policy->InMemResourceBrokerTask = NLocalDb::LegacyQueueIdToTaskName(0);
+ policy->ReadAheadHiThreshold = 100000;
+ policy->ReadAheadLoThreshold = 50000;
+ policy->Generations.push_back({100 * 1024 * 1024, 5, 5, 200 * 1024 * 1024, NLocalDb::LegacyQueueIdToTaskName(1), true});
+ policy->Generations.push_back({400 * 1024 * 1024, 5, 5, 800 * 1024 * 1024, NLocalDb::LegacyQueueIdToTaskName(2), false});
+ for (auto& gen : policy->Generations) {
+ gen.ExtraCompactionPercent = 0;
+ gen.ExtraCompactionMinSize = 0;
+ gen.ExtraCompactionExpPercent = 0;
+ gen.ExtraCompactionExpMaxSize = 0;
+ gen.UpliftPartSize = 0;
+ }
+
+ env.SendSync(data.MakeScheme(std::move(policy)));
+ env.SendSync(data.MakeRows(249));
+
+ env.SendSync(new TEvents::TEvPoison, false, true);
+ IActor *tabletActor = nullptr; // save tablet to get its actor id and avoid using tablet resolver which has outdated info
+ for (unsigned iter = 0; iter < 3; ++iter) {
+ struct TReassignedStarter : NFake::TStarter {
+ NFake::TStorageInfo* MakeTabletInfo(ui64 tablet) noexcept override {
+ auto *info = TStarter::MakeTabletInfo(tablet);
+ info->Channels[0].History.emplace_back(3, 3);
+ return info;
+ }
+ };
+ TReassignedStarter starter;
+ env.FireTablet(env.Edge, env.Tablet, [&env, &tabletActor](const TActorId &tablet, TTabletStorageInfo *info) mutable {
+ return tabletActor = new TTestFlatTablet(env.Edge, tablet, info);
+ }, 0, &starter);
+ env.WaitForWakeUp();
+ env.SendEv(tabletActor->SelfId(), data.MakeRows(500));
+ env.SendEv(tabletActor->SelfId(), new TEvents::TEvPoison);
+ }
+ env.SendEv(tabletActor->SelfId(), new TEvents::TEvPoison);
+ }
}
diff --git a/ydb/core/tablet_flat/logic_alter_main.h b/ydb/core/tablet_flat/logic_alter_main.h
index 8fc4ba024cc..2aa58011118 100644
--- a/ydb/core/tablet_flat/logic_alter_main.h
+++ b/ydb/core/tablet_flat/logic_alter_main.h
@@ -36,6 +36,11 @@ namespace NTabletFlatExecutor {
auto items = snap.MutableSchemeInfoBodies();
for (const auto &logo : Log)
LogoBlobIDFromLogoBlobID(logo, items->Add());
+
+ auto deleted = snap.MutableGcSnapLeft();
+ for (const auto &logo : ObsoleteLog) {
+ LogoBlobIDFromLogoBlobID(logo, deleted->Add());
+ }
}
void WriteLog(TLogCommit &commit, TString alter) noexcept
@@ -50,11 +55,18 @@ namespace NTabletFlatExecutor {
}
}
+ void Clear() noexcept
+ {
+ ObsoleteLog.splice(ObsoleteLog.end(), Log);
+ Bytes = 0;
+ }
+
protected:
TAutoPtr<NPageCollection::TSteppedCookieAllocator> Cookies;
NPageCollection::TSlicer Slicer;
ui64 Bytes = 0;
TList<TLogoBlobID> Log;
+ TList<TLogoBlobID> ObsoleteLog;
};
}
diff --git a/ydb/core/tablet_flat/test/libs/exec/helper.h b/ydb/core/tablet_flat/test/libs/exec/helper.h
index cfae1809379..9e420d75221 100644
--- a/ydb/core/tablet_flat/test/libs/exec/helper.h
+++ b/ydb/core/tablet_flat/test/libs/exec/helper.h
@@ -21,7 +21,7 @@ namespace NFake {
return new NFake::TOwner(user, retry, info, setup, followerId);
}
- static TStorageInfo* MakeTabletInfo(ui64 tablet) noexcept
+ virtual TStorageInfo* MakeTabletInfo(ui64 tablet) noexcept
{
const auto none = TErasureType::ErasureNone;
diff --git a/ydb/core/tablet_flat/test/libs/exec/runner.h b/ydb/core/tablet_flat/test/libs/exec/runner.h
index 9571e4d6bcb..75f69ca4e5d 100644
--- a/ydb/core/tablet_flat/test/libs/exec/runner.h
+++ b/ydb/core/tablet_flat/test/libs/exec/runner.h
@@ -66,11 +66,15 @@ namespace NFake {
return &Env;
}
- void FireTablet(TActorId user, ui32 tablet, TStarter::TMake make, ui32 followerId = 0)
+ void FireTablet(TActorId user, ui32 tablet, TStarter::TMake make, ui32 followerId = 0, TStarter *starter = nullptr)
{
const auto mbx = EMail::Simple;
+ TStarter defaultStarter;
+ if (starter == nullptr) {
+ starter = &defaultStarter;
+ }
- RunOn(7, { }, TStarter().Do(user, 1, tablet, std::move(make), followerId), mbx);
+ RunOn(7, { }, starter->Do(user, 1, tablet, std::move(make), followerId), mbx);
}
void FireFollower(TActorId user, ui32 tablet, TStarter::TMake make, ui32 followerId)