aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-23 22:00:00 +0300
committerchertus <azuikov@ydb.tech>2023-03-23 22:00:00 +0300
commitb5332ecf0bacc050ad3ff69a7252f74acbaac61c (patch)
tree187788249287905ff7cb9c51208c59fc65b2060a
parente02bdc34e84447047452610391d45f158f29ff03 (diff)
downloadydb-b5332ecf0bacc050ad3ff69a7252f74acbaac61c.tar.gz
better background tasks activation
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp79
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h39
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h16
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp51
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp3
9 files changed, 124 insertions, 77 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 44a82e18265..4fa8d2fd8fd 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -129,7 +129,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
return;
}
- if (LastBackActivation < TInstant::Now() - ActivationPeriod) {
+ if (LastPeriodicBackActivation < TInstant::Now() - ActivationPeriod) {
SendWaitPlanStep(GetOutdatedStep());
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 65c03dbb97a..beaa2f7b770 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -206,7 +206,7 @@ public:
switch (Trigger) {
case ETriggerActivities::POST_INSERT:
- Self->EnqueueBackgroundActivities(false, true);
+ Self->EnqueueBackgroundActivities(false, TBackgroundActivity::Indexation());
break;
case ETriggerActivities::POST_SCHEMA:
Self->EnqueueBackgroundActivities();
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index a0afaf3c3cd..b9da7837ea7 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -36,6 +36,7 @@ private:
THashMap<TString, TPathIdBlobs> ExportTierBlobs;
THashSet<NOlap::TEvictedBlob> BlobsToForget;
ui64 ExportNo = 0;
+ TBackgroundActivity TriggerActivity = TBackgroundActivity::All();
};
@@ -266,12 +267,14 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Self->IncCounter(COUNTER_COMPACTION_TIME, Ev->Get()->Duration.MilliSeconds());
} else if (changes->IsCleanup()) {
Self->ActiveCleanup = false;
+ TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Cleanup() : TBackgroundActivity::None();
Self->BlobManager->GetCleanupBlobs(BlobsToForget);
Self->IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL);
} else if (changes->IsTtl()) {
Self->ActiveTtl = false;
+ //TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Ttl() : TBackgroundActivity::None();
// Do not start new TTL till we evict current PortionsToEvict. We could evict them twice otherwise
Y_VERIFY(!Self->ActiveEvictions, "Unexpected active evictions count at tablet %lu", Self->TabletID());
@@ -293,7 +296,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
if (Ev->Get()->PutStatus == NKikimrProto::TRYLATER) {
ctx.Schedule(Self->FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
} else {
- Self->EnqueueBackgroundActivities();
+ Self->EnqueueBackgroundActivities(false, TriggerActivity);
}
for (auto& [tierName, pathBlobs] : ExportTierBlobs) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index bda32dc0c08..e8cb254e13d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -672,15 +672,18 @@ void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& s
}
}
-void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) {
- if (periodic && LastBackActivation > TInstant::Now() - ActivationPeriod) {
- return;
+void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) {
+ if (periodic) {
+ if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) {
+ return;
+ }
+ LastPeriodicBackActivation = TInstant::Now();
}
const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId());
SendPeriodicStats();
- if (insertOnly) {
+ if (activity.IndexationOnly()) {
if (auto event = SetupIndexation()) {
ctx.Send(IndexingActor, event.release());
}
@@ -690,44 +693,56 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) {
// Preventing conflicts between indexing and compaction leads to election between them.
// Indexing vs compaction probability depends on index and insert table overload status.
// Prefer compaction: 25% by default; 50% if IndexOverloaded(); 6.25% if InsertTableOverloaded().
- ui32 mask = IndexOverloaded() ? 0x1 : 0x3;
- if (InsertTableOverloaded()) {
- mask = 0x0F;
- }
- bool preferIndexing = (++BackgroundActivation) & mask;
+ if (activity.HasIndexation() && activity.HasCompaction()) {
+ ui32 mask = IndexOverloaded() ? 0x1 : 0x3;
+ if (InsertTableOverloaded()) {
+ mask = 0x0F;
+ }
+ bool preferIndexing = (++BackgroundActivation) & mask;
- if (preferIndexing) {
+ if (preferIndexing) {
+ if (auto evIdx = SetupIndexation()) {
+ ctx.Send(IndexingActor, evIdx.release());
+ } else if (auto event = SetupCompaction()) {
+ ctx.Send(CompactionActor, event.release());
+ }
+ } else {
+ if (auto event = SetupCompaction()) {
+ ctx.Send(CompactionActor, event.release());
+ } else if (auto evIdx = SetupIndexation()) {
+ ctx.Send(IndexingActor, evIdx.release());
+ }
+ }
+ } else if (activity.HasIndexation()) {
if (auto evIdx = SetupIndexation()) {
ctx.Send(IndexingActor, evIdx.release());
- } else if (auto event = SetupCompaction()) {
- ctx.Send(CompactionActor, event.release());
}
- } else {
+ } else if (activity.HasCompaction()) {
if (auto event = SetupCompaction()) {
ctx.Send(CompactionActor, event.release());
- } else if (auto evIdx = SetupIndexation()) {
- ctx.Send(IndexingActor, evIdx.release());
}
}
- if (auto event = SetupCleanup()) {
- ctx.Send(SelfId(), event.release());
- } else {
- // Small cleanup (no index changes)
- THashSet<NOlap::TEvictedBlob> blobsToForget;
- BlobManager->GetCleanupBlobs(blobsToForget);
- ForgetBlobs(ctx, blobsToForget);
- }
-
- if (auto event = SetupTtl()) {
- if (event->NeedDataReadWrite()) {
- ctx.Send(EvictionActor, event.release());
+ if (activity.HasCleanup()) {
+ if (auto event = SetupCleanup()) {
+ ctx.Send(SelfId(), event.release());
} else {
- ctx.Send(SelfId(), event->TxEvent.release());
+ // Small cleanup (no index changes)
+ THashSet<NOlap::TEvictedBlob> blobsToForget;
+ BlobManager->GetCleanupBlobs(blobsToForget);
+ ForgetBlobs(ctx, blobsToForget);
}
}
- LastBackActivation = TInstant::Now();
+ if (activity.HasTtl()) {
+ if (auto event = SetupTtl()) {
+ if (event->NeedDataReadWrite()) {
+ ctx.Send(EvictionActor, event.release());
+ } else {
+ ctx.Send(SelfId(), event->TxEvent.release());
+ }
+ }
+ }
}
std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
@@ -834,16 +849,12 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
}
PrimaryIndex->UpdateCompactionLimits(CompactionLimits.Get());
- auto compactionInfo = PrimaryIndex->Compact();
+ auto compactionInfo = PrimaryIndex->Compact(LastCompactedGranule);
if (!compactionInfo || compactionInfo->Empty()) {
LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID());
return {};
}
- // TODO: Compact granules in parallel
-
- // Rotate compaction granules: do not choose the same granule all the time.
- LastCompactedGranule = compactionInfo->ChooseOneGranule(LastCompactedGranule);
Y_VERIFY(compactionInfo->Good());
LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index e7e8ba14dac..568690cf9b4 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -70,6 +70,41 @@ struct TSettings {
}
};
+class TBackgroundActivity {
+public:
+ enum EBackActivity : ui32 {
+ NONE = 0x00,
+ INDEX = 0x01,
+ COMPACT = 0x02,
+ CLEAN = 0x04,
+ TTL = 0x08,
+ ALL = 0xffff
+ };
+
+ static TBackgroundActivity Indexation() { return TBackgroundActivity(INDEX); }
+ static TBackgroundActivity Compaction() { return TBackgroundActivity(COMPACT); }
+ static TBackgroundActivity Cleanup() { return TBackgroundActivity(CLEAN); }
+ static TBackgroundActivity Ttl() { return TBackgroundActivity(TTL); }
+ static TBackgroundActivity All() { return TBackgroundActivity(ALL); }
+ static TBackgroundActivity None() { return TBackgroundActivity(NONE); }
+
+ TBackgroundActivity() = default;
+
+ bool HasIndexation() const { return Activity & INDEX; }
+ bool HasCompaction() const { return Activity & COMPACT; }
+ bool HasCleanup() const { return Activity & CLEAN; }
+ bool HasTtl() const { return Activity & TTL; }
+ bool HasAll() const { return Activity == ALL; }
+ bool IndexationOnly() const { return Activity == INDEX; }
+
+private:
+ EBackActivity Activity = NONE;
+
+ TBackgroundActivity(EBackActivity activity)
+ : Activity(activity)
+ {}
+};
+
using ITransaction = NTabletFlatExecutor::ITransaction;
template <typename T>
@@ -352,7 +387,7 @@ private:
TDuration FailActivationDelay = TDuration::Seconds(1);
TDuration StatsReportInterval = TDuration::Seconds(10);
TInstant LastAccessTime;
- TInstant LastBackActivation;
+ TInstant LastPeriodicBackActivation;
TInstant LastStatsReport;
TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices.
@@ -430,7 +465,7 @@ private:
void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);
void EnqueueProgressTx(const TActorContext& ctx);
- void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false);
+ void EnqueueBackgroundActivities(bool periodic = false, TBackgroundActivity activity = TBackgroundActivity::All());
void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc);
void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc);
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 4a4aecd7e91..4195fe6b619 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -36,20 +36,6 @@ struct TCompactionInfo {
bool Empty() const { return Granules.empty(); }
bool Good() const { return Granules.size() == 1; }
- ui64 ChooseOneGranule(ui64 lastGranule) {
- Y_VERIFY(Granules.size());
-
- auto it = Granules.upper_bound(lastGranule);
- if (it == Granules.end()) {
- it = Granules.begin();
- }
-
- ui64 granule = *it;
- Granules.clear();
- Granules.insert(granule);
- return granule;
- }
-
friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) {
if (info.Good() == 1) {
ui64 granule = *info.Granules.begin();
@@ -318,7 +304,7 @@ public:
const THashSet<ui32>& columnIds,
std::shared_ptr<TPredicate> from,
std::shared_ptr<TPredicate> to) const = 0;
- virtual std::unique_ptr<TCompactionInfo> Compact() = 0;
+ virtual std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) = 0;
virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo,
const TSnapshot& outdatedSnapshot) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 0af8f029ce5..c628a08ee06 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -775,6 +775,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
}
if (affectedRecords > maxRecords) {
+ changes->NeedRepeat = true;
break;
}
}
@@ -806,6 +807,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
}
if (affectedRecords > maxRecords) {
+ changes->NeedRepeat = true;
break;
}
}
@@ -1496,44 +1498,53 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact
|| sumSize >= limits.GranuleOverloadSize;
}
-std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact() {
- auto info = std::make_unique<TCompactionInfo>();
- info->InGranule = true;
- auto& out = info->Granules;
+std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) {
+ if (CompactionGranules.empty()) {
+ return {};
+ }
+
+ std::optional<ui64> outGranule;
+ bool inGranule = true;
- std::vector<ui64> goodGranules;
- for (ui64 granule : CompactionGranules) {
+ auto it = CompactionGranules.upper_bound(lastCompactedGranule);
+ if (it == CompactionGranules.end()) {
+ it = CompactionGranules.begin();
+ }
+
+ while (!CompactionGranules.empty()) {
+ ui64 granule = *it;
auto spg = Granules.find(granule)->second;
Y_VERIFY(spg);
// We need only actual portions here (with empty XPlanStep:XTxId)
auto actualPortions = GetActualPortions(spg->Portions);
if (actualPortions.empty()) {
+ it = CompactionGranules.erase(it);
continue;
}
ui32 inserted = 0;
bool needSplit = NeedSplit(actualPortions, Limits, inserted);
if (needSplit) {
- if (info->InGranule) {
- info->InGranule = false;
- out.clear(); // clear in-granule candidates, we have a splitting one
- }
- out.insert(granule);
+ inGranule = false;
+ outGranule = granule;
+ break;
} else if (inserted) {
- if (info->InGranule) {
- out.insert(granule);
- }
- } else {
- goodGranules.push_back(granule);
+ outGranule = granule;
+ break;
}
- }
- for (ui64 granule : goodGranules) {
- CompactionGranules.erase(granule);
+ it = CompactionGranules.erase(it);
+ if (it == CompactionGranules.end()) {
+ it = CompactionGranules.begin();
+ }
}
- if (!out.empty()) {
+ if (outGranule) {
+ auto info = std::make_unique<TCompactionInfo>();
+ info->Granules.insert(*outGranule);
+ info->InGranule = inGranule;
+ lastCompactedGranule = *outGranule;
return info;
}
return {};
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 2bd7b86ddaf..0b70368a7a4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -269,7 +269,7 @@ public:
const THashSet<ui32>& columnIds,
std::shared_ptr<TPredicate> from,
std::shared_ptr<TPredicate> to) const override;
- std::unique_ptr<TCompactionInfo> Compact() override;
+ std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) override;
// Static part of IColumnEngine iface (called from actors). It's static cause there's no threads sync.
@@ -317,7 +317,7 @@ private:
THashSet<ui64> GranulesInSplit;
THashSet<ui64> EmptyGranules;
THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded;
- THashSet<ui64> CompactionGranules;
+ TSet<ui64> CompactionGranules;
THashSet<ui64> CleanupGranules;
TColumnEngineStats Counters;
ui64 LastPortion;
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 2171084b2c3..f9a6aa08522 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -307,7 +307,8 @@ struct TExpected {
bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step,
const TExpected& expected) {
- auto compactionInfo = engine.Compact();
+ ui64 lastCompactedGranule = 0;
+ auto compactionInfo = engine.Compact(lastCompactedGranule);
UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1);
UNIT_ASSERT(!compactionInfo->InGranule);