diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-04 20:20:36 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-08-04 21:21:28 +0300 |
commit | be7de67c7a6493cb9c1d9c7a78b1399791f933e7 (patch) | |
tree | b968e6b2a217925c96319110719629eae3640dc6 | |
parent | 0ba0105da161f4fe59a3cc0697e1af8e0cc7fa35 (diff) | |
download | ydb-be7de67c7a6493cb9c1d9c7a78b1399791f933e7.tar.gz |
KIKIMR-18952: private evictions list. verifications on list append.
10 files changed, 59 insertions, 39 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index ad4074f256..3ae2b271aa 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -89,12 +89,7 @@ void TColumnShard::TBackgroundController::StartTtl(const NOlap::TColumnEngineCha Y_VERIFY(ttlChanges); Y_VERIFY(ActiveTtlGranules.empty()); - for (const auto& portionInfo : ttlChanges->PortionsToDrop) { - ActiveTtlGranules.emplace(portionInfo.GetGranule()); - } - for (const auto& [portionInfo, _] : ttlChanges->PortionsToEvict) { - ActiveTtlGranules.emplace(portionInfo.GetGranule()); - } + ttlChanges->FillTouchedGranules(ActiveTtlGranules); } bool TColumnShard::TAlterMeta::Validate(const NOlap::ISnapshotSchema::TPtr& schema) const { @@ -806,7 +801,7 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u return {}; } - bool needWrites = !indexChanges->PortionsToEvict.empty(); + const bool needWrites = indexChanges->NeedConstruction(); LOG_S_INFO("TTL" << (needWrites ? " with writes" : "" ) << " prepared at tablet " << TabletID()); indexChanges->Start(*this); diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract.cpp index a10d80c0a2..b9a130c202 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract.cpp @@ -1,5 +1,6 @@ #include "abstract.h" #include <ydb/core/tablet_flat/tablet_flat_executor.h> +#include <ydb/core/tx/columnshard/engines/column_engine_logs.h> #include <ydb/core/tx/columnshard/blob_manager_db.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <library/cpp/actors/core/actor.h> @@ -36,7 +37,7 @@ NKikimr::TConclusion<std::vector<TString>> TColumnEngineChanges::ConstructBlobs( bool TColumnEngineChanges::ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) { Y_VERIFY(Stage == EStage::Compiled); - + NActors::TLogContextGuard lGuard(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", self.GetTabletId())); if (!DoApplyChanges(self, context, dryRun)) { Y_VERIFY(dryRun); return false; diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h index 56faf863fa..d695592bd5 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract.h @@ -198,6 +198,8 @@ public: return Stage == EStage::Aborted; } + virtual void FillTouchedGranules(THashSet<ui64>& granules) const = 0; + void StartEmergency(); void AbortEmergency(); diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.h b/ydb/core/tx/columnshard/engines/changes/cleanup.h index edc7ea1887..1c1c5c3808 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.h @@ -23,6 +23,12 @@ protected: } virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; public: + virtual void FillTouchedGranules(THashSet<ui64>& granules) const override { + for (const auto& portionInfo : PortionsToDrop) { + granules.emplace(portionInfo.GetGranule()); + } + } + std::vector<TPortionInfo> PortionsToDrop; bool NeedRepeat = false; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 996a003eb9..304aebe5b7 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -44,7 +44,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { portionInfo.SetGranule(it->second); } - TPortionMeta::EProduced produced = TPortionMeta::INSERTED; + TPortionMeta::EProduced produced = TPortionMeta::EProduced::INSERTED; // If it's a split compaction with moves appended portions are INSERTED (could have overlaps with others) if (PortionsToMove.empty()) { produced = producedClassResultCompaction; @@ -253,4 +253,8 @@ TCompactColumnEngineChanges::~TCompactColumnEngineChanges() { Y_VERIFY_DEBUG(!NActors::TlsActivationContext || !NeedGranuleStatusProvide); } +void TCompactColumnEngineChanges::FillTouchedGranules(THashSet<ui64>& granules) const { + granules.emplace(GranuleMeta->GetGranuleId()); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index b83835d734..69c4a4f7d5 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -40,6 +40,8 @@ public: std::vector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule} std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones + virtual void FillTouchedGranules(THashSet<ui64>& granules) const override; + TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule); ~TCompactColumnEngineChanges(); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index 3597936f4c..2705d9238e 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -32,6 +32,15 @@ public: { } + + virtual void FillTouchedGranules(THashSet<ui64>& granules) const override { + for (auto&& i : PathToGranule) { + for (auto&& g : i.second) { + granules.emplace(g.second); + } + } + } + virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override; virtual TString TypeString() const override { return "INSERT"; diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index de0c9ea536..f7ba7bdc5e 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -26,31 +26,21 @@ bool TTTLColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyC if (!TBase::DoApplyChanges(self, context, dryRun)) { return false; } - // Update evicted portions - // There could be race between compaction and eviction. Allow compaction and disallow eviction in this case. - - for (auto& [info, _] : PortionsToEvict) { - const auto& portionInfo = info; - Y_VERIFY(!portionInfo.Empty()); - Y_VERIFY(portionInfo.IsActive()); + for (auto& [portionInfo, _] : PortionsToEvict) { const ui64 granule = portionInfo.GetGranule(); const ui64 portion = portionInfo.GetPortion(); if (!self.IsPortionExists(granule, portion)) { - LOG_S_ERROR("Cannot evict unknown portion " << portionInfo << " at tablet " << self.GetTabletId()); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot evict unknown portion")("portion", portionInfo.DebugString()); return false; } - // In case of race with compaction portion could become inactive const TPortionInfo& oldInfo = self.GetGranuleVerified(granule).GetPortionVerified(portion); - if (!oldInfo.IsActive()) { - LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << self.GetTabletId()); - return false; - } + Y_VERIFY(oldInfo.IsActive()); Y_VERIFY(portionInfo.TierName != oldInfo.TierName); if (!self.UpsertPortion(portionInfo, !dryRun, &oldInfo)) { - LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << self.GetTabletId()); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot evict portion")("portion", portionInfo.DebugString()); return false; } @@ -139,7 +129,7 @@ void TTTLColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWr void TTTLColumnEngineChanges::DoCompile(TFinalizationContext& context) { TBase::DoCompile(context); for (auto& [portionInfo, _] : PortionsToEvict) { - portionInfo.UpdateRecordsMeta(TPortionMeta::EVICTED); + portionInfo.UpdateRecordsMeta(TPortionMeta::EProduced::EVICTED); } } @@ -233,9 +223,6 @@ NKikimr::TConclusion<std::vector<TString>> TTTLColumnEngineChanges::DoConstructB evicted.reserve(PortionsToEvict.size()); for (auto& [portionInfo, evictFeatures] : PortionsToEvict) { - Y_VERIFY(!portionInfo.Empty()); - Y_VERIFY(portionInfo.IsActive()); - if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs, EvictedRecords, newBlobs, context)) { Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName); diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index bc4782f9a4..630de5c984 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -14,6 +14,7 @@ private: bool UpdateEvictedPortion(TPortionInfo& portionInfo, TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs, TConstructionContext& context) const; + std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures} protected: virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; @@ -24,16 +25,32 @@ protected: virtual void DoDebugString(TStringOutput& out) const override; virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; + virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; +public: virtual bool NeedConstruction() const override { return PortionsToEvict.size(); } - virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override; -public: + virtual void FillTouchedGranules(THashSet<ui64>& granules) const override { + TBase::FillTouchedGranules(granules); + for (const auto& [portionInfo, _] : PortionsToEvict) { + granules.emplace(portionInfo.GetGranule()); + } + } + std::vector<TColumnRecord> EvictedRecords; THashMap<ui64, NOlap::TTiering> Tiering; - std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures} virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override; + void AddPortionToEvict(const TPortionInfo& info, TPortionEvictionFeatures&& features) { + Y_VERIFY(!info.Empty()); + Y_VERIFY(info.IsActive()); + PortionsToEvict.emplace_back(info, std::move(features)); + } + + ui32 GetPortionsToEvictCount() const { + return PortionsToEvict.size(); + } + virtual void UpdateWritePortionInfo(const ui32 index, const TPortionInfo& info) override { PortionsToEvict[index].first = info; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index af814e84fd..9035524f2f 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -243,11 +243,9 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& // Do not count the blob as lost since it exists in the index. lostBlobs.erase(rec.BlobRange.BlobId); // Locate granule and append the record. - if (const auto gi = Granules.find(portion.GetGranule()); gi != Granules.end()) { - gi->second->AddColumnRecord(indexInfo, portion, rec); - } else { - Y_VERIFY(false); - } + const auto gi = Granules.find(portion.GetGranule()); + Y_VERIFY(gi != Granules.end()); + gi->second->AddColumnRecord(indexInfo, portion, rec); }); } @@ -478,7 +476,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", info.TierName)("to", tierName); evictionSize += info.BlobsSizes().first; const bool needExport = ttl.NeedExport(tierName); - context.Changes->PortionsToEvict.emplace_back(info, TPortionEvictionFeatures(tierName, pathId, needExport)); + context.Changes->AddPortionToEvict(info, TPortionEvictionFeatures(tierName, pathId, needExport)); SignalCounters.OnPortionToEvict(info.BlobsBytes()); } } @@ -522,7 +520,7 @@ bool TColumnEngineForLogs::DrainEvictionQueue(std::map<TMonotonic, std::vector<T AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "too early")("first", evictionsQueue.begin()->first)("now", nowMonotonic); } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stop scan")("reason", "task_ready")("first", evictionsQueue.begin()->first)("now", nowMonotonic) - ("internal", hasChanges)("evict_portions", context.Changes->PortionsToEvict.size()) + ("internal", hasChanges)("evict_portions", context.Changes->GetPortionsToEvictCount()) ("drop_portions", context.Changes->PortionsToDrop.size()); } } else { @@ -549,8 +547,7 @@ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const TH DrainEvictionQueue(EvictionsController.MutableNextCheckInstantForTierings(), context); } - if (changes->PortionsToDrop.empty() && - changes->PortionsToEvict.empty()) { + if (changes->PortionsToDrop.empty() && !changes->GetPortionsToEvictCount()) { return nullptr; } |