diff options
author | chertus <azuikov@ydb.tech> | 2023-03-23 22:00:00 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-23 22:00:00 +0300 |
commit | b5332ecf0bacc050ad3ff69a7252f74acbaac61c (patch) | |
tree | 187788249287905ff7cb9c51208c59fc65b2060a | |
parent | e02bdc34e84447047452610391d45f158f29ff03 (diff) | |
download | ydb-b5332ecf0bacc050ad3ff69a7252f74acbaac61c.tar.gz |
better background tasks activation
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 79 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 39 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine.h | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 51 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 3 |
9 files changed, 124 insertions, 77 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 44a82e18265..4fa8d2fd8fd 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -129,7 +129,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC return; } - if (LastBackActivation < TInstant::Now() - ActivationPeriod) { + if (LastPeriodicBackActivation < TInstant::Now() - ActivationPeriod) { SendWaitPlanStep(GetOutdatedStep()); } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 65c03dbb97a..beaa2f7b770 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -206,7 +206,7 @@ public: switch (Trigger) { case ETriggerActivities::POST_INSERT: - Self->EnqueueBackgroundActivities(false, true); + Self->EnqueueBackgroundActivities(false, TBackgroundActivity::Indexation()); break; case ETriggerActivities::POST_SCHEMA: Self->EnqueueBackgroundActivities(); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index a0afaf3c3cd..b9da7837ea7 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -36,6 +36,7 @@ private: THashMap<TString, TPathIdBlobs> ExportTierBlobs; THashSet<NOlap::TEvictedBlob> BlobsToForget; ui64 ExportNo = 0; + TBackgroundActivity TriggerActivity = TBackgroundActivity::All(); }; @@ -266,12 +267,14 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Self->IncCounter(COUNTER_COMPACTION_TIME, Ev->Get()->Duration.MilliSeconds()); } else if (changes->IsCleanup()) { Self->ActiveCleanup = false; + TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Cleanup() : TBackgroundActivity::None(); Self->BlobManager->GetCleanupBlobs(BlobsToForget); Self->IncCounter(ok ? COUNTER_CLEANUP_SUCCESS : COUNTER_CLEANUP_FAIL); } else if (changes->IsTtl()) { Self->ActiveTtl = false; + //TriggerActivity = changes->NeedRepeat ? TBackgroundActivity::Ttl() : TBackgroundActivity::None(); // Do not start new TTL till we evict current PortionsToEvict. We could evict them twice otherwise Y_VERIFY(!Self->ActiveEvictions, "Unexpected active evictions count at tablet %lu", Self->TabletID()); @@ -293,7 +296,7 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { if (Ev->Get()->PutStatus == NKikimrProto::TRYLATER) { ctx.Schedule(Self->FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true)); } else { - Self->EnqueueBackgroundActivities(); + Self->EnqueueBackgroundActivities(false, TriggerActivity); } for (auto& [tierName, pathBlobs] : ExportTierBlobs) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index bda32dc0c08..e8cb254e13d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -672,15 +672,18 @@ void TColumnShard::SetPrimaryIndex(TMap<NOlap::TSnapshot, NOlap::TIndexInfo>&& s } } -void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { - if (periodic && LastBackActivation > TInstant::Now() - ActivationPeriod) { - return; +void TColumnShard::EnqueueBackgroundActivities(bool periodic, TBackgroundActivity activity) { + if (periodic) { + if (LastPeriodicBackActivation > TInstant::Now() - ActivationPeriod) { + return; + } + LastPeriodicBackActivation = TInstant::Now(); } const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId()); SendPeriodicStats(); - if (insertOnly) { + if (activity.IndexationOnly()) { if (auto event = SetupIndexation()) { ctx.Send(IndexingActor, event.release()); } @@ -690,44 +693,56 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { // Preventing conflicts between indexing and compaction leads to election between them. // Indexing vs compaction probability depends on index and insert table overload status. // Prefer compaction: 25% by default; 50% if IndexOverloaded(); 6.25% if InsertTableOverloaded(). - ui32 mask = IndexOverloaded() ? 0x1 : 0x3; - if (InsertTableOverloaded()) { - mask = 0x0F; - } - bool preferIndexing = (++BackgroundActivation) & mask; + if (activity.HasIndexation() && activity.HasCompaction()) { + ui32 mask = IndexOverloaded() ? 0x1 : 0x3; + if (InsertTableOverloaded()) { + mask = 0x0F; + } + bool preferIndexing = (++BackgroundActivation) & mask; - if (preferIndexing) { + if (preferIndexing) { + if (auto evIdx = SetupIndexation()) { + ctx.Send(IndexingActor, evIdx.release()); + } else if (auto event = SetupCompaction()) { + ctx.Send(CompactionActor, event.release()); + } + } else { + if (auto event = SetupCompaction()) { + ctx.Send(CompactionActor, event.release()); + } else if (auto evIdx = SetupIndexation()) { + ctx.Send(IndexingActor, evIdx.release()); + } + } + } else if (activity.HasIndexation()) { if (auto evIdx = SetupIndexation()) { ctx.Send(IndexingActor, evIdx.release()); - } else if (auto event = SetupCompaction()) { - ctx.Send(CompactionActor, event.release()); } - } else { + } else if (activity.HasCompaction()) { if (auto event = SetupCompaction()) { ctx.Send(CompactionActor, event.release()); - } else if (auto evIdx = SetupIndexation()) { - ctx.Send(IndexingActor, evIdx.release()); } } - if (auto event = SetupCleanup()) { - ctx.Send(SelfId(), event.release()); - } else { - // Small cleanup (no index changes) - THashSet<NOlap::TEvictedBlob> blobsToForget; - BlobManager->GetCleanupBlobs(blobsToForget); - ForgetBlobs(ctx, blobsToForget); - } - - if (auto event = SetupTtl()) { - if (event->NeedDataReadWrite()) { - ctx.Send(EvictionActor, event.release()); + if (activity.HasCleanup()) { + if (auto event = SetupCleanup()) { + ctx.Send(SelfId(), event.release()); } else { - ctx.Send(SelfId(), event->TxEvent.release()); + // Small cleanup (no index changes) + THashSet<NOlap::TEvictedBlob> blobsToForget; + BlobManager->GetCleanupBlobs(blobsToForget); + ForgetBlobs(ctx, blobsToForget); } } - LastBackActivation = TInstant::Now(); + if (activity.HasTtl()) { + if (auto event = SetupTtl()) { + if (event->NeedDataReadWrite()) { + ctx.Send(EvictionActor, event.release()); + } else { + ctx.Send(SelfId(), event->TxEvent.release()); + } + } + } } std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { @@ -834,16 +849,12 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { } PrimaryIndex->UpdateCompactionLimits(CompactionLimits.Get()); - auto compactionInfo = PrimaryIndex->Compact(); + auto compactionInfo = PrimaryIndex->Compact(LastCompactedGranule); if (!compactionInfo || compactionInfo->Empty()) { LOG_S_DEBUG("Compaction not started: no portions to compact at tablet " << TabletID()); return {}; } - // TODO: Compact granules in parallel - - // Rotate compaction granules: do not choose the same granule all the time. - LastCompactedGranule = compactionInfo->ChooseOneGranule(LastCompactedGranule); Y_VERIFY(compactionInfo->Good()); LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index e7e8ba14dac..568690cf9b4 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -70,6 +70,41 @@ struct TSettings { } }; +class TBackgroundActivity { +public: + enum EBackActivity : ui32 { + NONE = 0x00, + INDEX = 0x01, + COMPACT = 0x02, + CLEAN = 0x04, + TTL = 0x08, + ALL = 0xffff + }; + + static TBackgroundActivity Indexation() { return TBackgroundActivity(INDEX); } + static TBackgroundActivity Compaction() { return TBackgroundActivity(COMPACT); } + static TBackgroundActivity Cleanup() { return TBackgroundActivity(CLEAN); } + static TBackgroundActivity Ttl() { return TBackgroundActivity(TTL); } + static TBackgroundActivity All() { return TBackgroundActivity(ALL); } + static TBackgroundActivity None() { return TBackgroundActivity(NONE); } + + TBackgroundActivity() = default; + + bool HasIndexation() const { return Activity & INDEX; } + bool HasCompaction() const { return Activity & COMPACT; } + bool HasCleanup() const { return Activity & CLEAN; } + bool HasTtl() const { return Activity & TTL; } + bool HasAll() const { return Activity == ALL; } + bool IndexationOnly() const { return Activity == INDEX; } + +private: + EBackActivity Activity = NONE; + + TBackgroundActivity(EBackActivity activity) + : Activity(activity) + {} +}; + using ITransaction = NTabletFlatExecutor::ITransaction; template <typename T> @@ -352,7 +387,7 @@ private: TDuration FailActivationDelay = TDuration::Seconds(1); TDuration StatsReportInterval = TDuration::Seconds(10); TInstant LastAccessTime; - TInstant LastBackActivation; + TInstant LastPeriodicBackActivation; TInstant LastStatsReport; TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices. @@ -430,7 +465,7 @@ private: void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort); void EnqueueProgressTx(const TActorContext& ctx); - void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false); + void EnqueueBackgroundActivities(bool periodic = false, TBackgroundActivity activity = TBackgroundActivity::All()); void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc); void ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 4a4aecd7e91..4195fe6b619 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -36,20 +36,6 @@ struct TCompactionInfo { bool Empty() const { return Granules.empty(); } bool Good() const { return Granules.size() == 1; } - ui64 ChooseOneGranule(ui64 lastGranule) { - Y_VERIFY(Granules.size()); - - auto it = Granules.upper_bound(lastGranule); - if (it == Granules.end()) { - it = Granules.begin(); - } - - ui64 granule = *it; - Granules.clear(); - Granules.insert(granule); - return granule; - } - friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) { if (info.Good() == 1) { ui64 granule = *info.Granules.begin(); @@ -318,7 +304,7 @@ public: const THashSet<ui32>& columnIds, std::shared_ptr<TPredicate> from, std::shared_ptr<TPredicate> to) const = 0; - virtual std::unique_ptr<TCompactionInfo> Compact() = 0; + virtual std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartInsert(TVector<TInsertedData>&& dataToIndex) = 0; virtual std::shared_ptr<TColumnEngineChanges> StartCompaction(std::unique_ptr<TCompactionInfo>&& compactionInfo, const TSnapshot& outdatedSnapshot) = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 0af8f029ce5..c628a08ee06 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -775,6 +775,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T } if (affectedRecords > maxRecords) { + changes->NeedRepeat = true; break; } } @@ -806,6 +807,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T } if (affectedRecords > maxRecords) { + changes->NeedRepeat = true; break; } } @@ -1496,44 +1498,53 @@ static bool NeedSplit(const TVector<const TPortionInfo*>& actual, const TCompact || sumSize >= limits.GranuleOverloadSize; } -std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact() { - auto info = std::make_unique<TCompactionInfo>(); - info->InGranule = true; - auto& out = info->Granules; +std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompactedGranule) { + if (CompactionGranules.empty()) { + return {}; + } + + std::optional<ui64> outGranule; + bool inGranule = true; - std::vector<ui64> goodGranules; - for (ui64 granule : CompactionGranules) { + auto it = CompactionGranules.upper_bound(lastCompactedGranule); + if (it == CompactionGranules.end()) { + it = CompactionGranules.begin(); + } + + while (!CompactionGranules.empty()) { + ui64 granule = *it; auto spg = Granules.find(granule)->second; Y_VERIFY(spg); // We need only actual portions here (with empty XPlanStep:XTxId) auto actualPortions = GetActualPortions(spg->Portions); if (actualPortions.empty()) { + it = CompactionGranules.erase(it); continue; } ui32 inserted = 0; bool needSplit = NeedSplit(actualPortions, Limits, inserted); if (needSplit) { - if (info->InGranule) { - info->InGranule = false; - out.clear(); // clear in-granule candidates, we have a splitting one - } - out.insert(granule); + inGranule = false; + outGranule = granule; + break; } else if (inserted) { - if (info->InGranule) { - out.insert(granule); - } - } else { - goodGranules.push_back(granule); + outGranule = granule; + break; } - } - for (ui64 granule : goodGranules) { - CompactionGranules.erase(granule); + it = CompactionGranules.erase(it); + if (it == CompactionGranules.end()) { + it = CompactionGranules.begin(); + } } - if (!out.empty()) { + if (outGranule) { + auto info = std::make_unique<TCompactionInfo>(); + info->Granules.insert(*outGranule); + info->InGranule = inGranule; + lastCompactedGranule = *outGranule; return info; } return {}; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 2bd7b86ddaf..0b70368a7a4 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -269,7 +269,7 @@ public: const THashSet<ui32>& columnIds, std::shared_ptr<TPredicate> from, std::shared_ptr<TPredicate> to) const override; - std::unique_ptr<TCompactionInfo> Compact() override; + std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) override; // Static part of IColumnEngine iface (called from actors). It's static cause there's no threads sync. @@ -317,7 +317,7 @@ private: THashSet<ui64> GranulesInSplit; THashSet<ui64> EmptyGranules; THashMap<ui64, THashSet<ui64>> PathsGranulesOverloaded; - THashSet<ui64> CompactionGranules; + TSet<ui64> CompactionGranules; THashSet<ui64> CleanupGranules; TColumnEngineStats Counters; ui64 LastPortion; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 2171084b2c3..f9a6aa08522 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -307,7 +307,8 @@ struct TExpected { bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, THashMap<TBlobRange, TString>&& blobs, ui32& step, const TExpected& expected) { - auto compactionInfo = engine.Compact(); + ui64 lastCompactedGranule = 0; + auto compactionInfo = engine.Compact(lastCompactedGranule); UNIT_ASSERT_VALUES_EQUAL(compactionInfo->Granules.size(), 1); UNIT_ASSERT(!compactionInfo->InGranule); |