diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-28 19:45:59 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-28 19:45:59 +0300 |
commit | 494854b865e4e9ad5847d32ba8156fb4caa3e8c7 (patch) | |
tree | bd1ace3542c42e92a846c575dec85448c9c1e99d | |
parent | 2dd5e6a379dd2cbe5025655909c28b8672602ed4 (diff) | |
download | ydb-494854b865e4e9ad5847d32ba8156fb4caa3e8c7.tar.gz |
KIKIMR-18796:clean changes applying methods and restore guard control for compaction
14 files changed, 98 insertions, 164 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 922b5492b0..c154f08565 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -69,7 +69,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); - ApplySuccess = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); // update changes + apply + ApplySuccess = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); if (ApplySuccess) { LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix()); NOlap::TWriteIndexContext context(txc, dbWrap); @@ -81,13 +81,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) Self->UpdateIndexCounters(); } else { - NOlap::TChangesFinishContext context; + NOlap::TChangesFinishContext context("cannot apply changes"); changes->Abort(*Self, context); LOG_S_NOTICE(TxPrefix() << "(" << changes->TypeString() << ") cannot apply changes: " << *changes << TxSuffix()); } } else { - NOlap::TChangesFinishContext context; + NOlap::TChangesFinishContext context("cannot write index blobs"); changes->Abort(*Self, context); LOG_S_ERROR(TxPrefix() << " (" << changes->TypeString() << ") cannot write index blobs" << TxSuffix()); } @@ -103,8 +103,8 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) { const ui64 blobsWritten = Ev->Get()->PutResult->GetBlobBatch().GetBlobCount(); const ui64 bytesWritten = Ev->Get()->PutResult->GetBlobBatch().GetTotalSize(); - { - NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, ApplySuccess, Ev->Get()->Duration, TriggerActivity); + if (!Ev->Get()->IndexChanges->IsAborted()) { + NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity); Ev->Get()->IndexChanges->WriteIndexComplete(*Self, context); } @@ -126,7 +126,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte IncCounter(COUNTER_OUT_OF_SPACE); ev->Get()->SetPutStatus(NKikimrProto::TRYLATER); - NOlap::TChangesFinishContext context; + NOlap::TChangesFinishContext context("out of disk space"); ev->Get()->IndexChanges->Abort(*this, context); ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true)); } else { diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 20594b328d..445a47c382 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -764,7 +764,7 @@ void TColumnShard::SetupCompaction() { break; } - LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID()); + LOG_S_DEBUG("Prepare " << compactionInfo->GetGranule()->DebugString() << "/" << compactionInfo->InGranule() << " at tablet " << TabletID()); auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), limits); if (!indexChanges) { diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract.cpp index b12b1cdec0..8fe2705f12 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract.cpp @@ -34,20 +34,12 @@ NKikimr::TConclusion<std::vector<TString>> TColumnEngineChanges::ConstructBlobs( } bool TColumnEngineChanges::ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) { - Y_VERIFY(Stage != EStage::Aborted); - if ((ui32)Stage >= (ui32)EStage::Applied) { - return true; - } Y_VERIFY(Stage == EStage::Compiled); if (!DoApplyChanges(self, context, dryRun)) { - if (dryRun) { - OnChangesApplyFailed("problems on apply"); - } Y_VERIFY(dryRun); return false; } else if (!dryRun) { - OnChangesApplyFinished(); Stage = EStage::Applied; } return true; @@ -65,17 +57,11 @@ void TColumnEngineChanges::WriteIndex(NColumnShard::TColumnShard& self, TWriteIn } void TColumnEngineChanges::WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { - Y_VERIFY(Stage == EStage::Aborted || Stage == EStage::Written); - if (Stage == EStage::Aborted) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("stage", Stage); - return; - } - if (Stage == EStage::Written) { - Stage = EStage::Finished; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully); - DoWriteIndexComplete(self, context); - DoOnFinish(self, context); - } + Y_VERIFY(Stage == EStage::Written); + Stage = EStage::Finished; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully); + DoWriteIndexComplete(self, context); + DoOnFinish(self, context); } void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept { @@ -91,13 +77,12 @@ void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept { } TColumnEngineChanges::~TColumnEngineChanges() { - Y_VERIFY_DEBUG(!NActors::TlsActivationContext || Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted); + Y_VERIFY(!NActors::TlsActivationContext || Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted); } void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinishContext& context) { Y_VERIFY(Stage != EStage::Finished && Stage != EStage::Created && Stage != EStage::Aborted); Stage = EStage::Aborted; - DoAbort(); DoOnFinish(self, context); } @@ -121,6 +106,7 @@ void TColumnEngineChanges::StartEmergency() { void TColumnEngineChanges::AbortEmergency() { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "AbortEmergency"); Stage = EStage::Aborted; + OnAbortEmergency(); } TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper) diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h index 3df0bcc5ee..c609b312ed 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract.h @@ -100,23 +100,32 @@ public: }; class TChangesFinishContext { +public: + const bool FinishedSuccessfully = true; + const TString ErrorMessage; + TChangesFinishContext(const TString& errorMessage) + : FinishedSuccessfully(false) + , ErrorMessage(errorMessage) { + + } + TChangesFinishContext() = default; }; class TWriteIndexCompleteContext: TNonCopyable, public TChangesFinishContext { +private: + using TBase = TChangesFinishContext; public: const TActorContext& ActorContext; const ui32 BlobsWritten; const ui64 BytesWritten; - const bool FinishedSuccessfully; const TDuration Duration; NColumnShard::TBackgroundActivity& TriggerActivity; - TWriteIndexCompleteContext(const TActorContext& actorContext, const ui32 blobsWritten, const ui64 bytesWritten, const bool finishedSuccessfully, const TDuration d, - NColumnShard::TBackgroundActivity& triggerActivity) + TWriteIndexCompleteContext(const TActorContext& actorContext, const ui32 blobsWritten, const ui64 bytesWritten + , const TDuration d, NColumnShard::TBackgroundActivity& triggerActivity) : ActorContext(actorContext) , BlobsWritten(blobsWritten) , BytesWritten(bytesWritten) - , FinishedSuccessfully(finishedSuccessfully) , Duration(d) , TriggerActivity(triggerActivity) { @@ -170,23 +179,22 @@ protected: virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) = 0; - virtual void OnChangesApplyFailed(const TString& /*errorMessage*/) { - } - virtual void OnChangesApplyFinished() { - } - virtual void DoAbort() { - - } virtual bool NeedConstruction() const { return true; } virtual void DoStart(NColumnShard::TColumnShard& self) = 0; virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept = 0; + virtual void OnAbortEmergency() { + } public: TConclusion<std::vector<TString>> ConstructBlobs(TConstructionContext& context); virtual ~TColumnEngineChanges(); + bool IsAborted() const { + return Stage == EStage::Aborted; + } + void StartEmergency(); void AbortEmergency(); @@ -216,10 +224,6 @@ public: virtual TString TypeString() const = 0; TString DebugString() const; - virtual const TGranuleMeta* GetGranuleMeta() const { - return nullptr; - } - ui64 TotalBlobsSize() const { ui64 size = 0; for (const auto& [_, blob] : Blobs) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 9c76092b4e..a3d8d5e70f 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -6,10 +6,6 @@ namespace NKikimr::NOlap { -const TGranuleMeta* TCompactColumnEngineChanges::GetGranuleMeta() const { - return &CompactionInfo->GetObject<TGranuleMeta>(); -} - void TCompactColumnEngineChanges::DoDebugString(TStringOutput& out) const { TBase::DoDebugString(out); if (ui32 switched = SwitchedPortions.size()) { @@ -181,14 +177,14 @@ THashMap<ui64, ui64> TCompactColumnEngineChanges::TmpToNewGranules(TFinalization for (const auto& [mark, counter] : TmpGranuleIds) { if (mark == SrcGranule.Mark) { Y_VERIFY(!counter); - granuleRemap[counter] = SrcGranule.Granule; + granuleRemap[counter] = GranuleMeta->GetGranuleId(); } else { Y_VERIFY(counter); auto it = granuleRemap.find(counter); if (it == granuleRemap.end()) { it = granuleRemap.emplace(counter, context.NextGranuleId()).first; } - newGranules.emplace(it->second, std::make_pair(SrcGranule.PathId, mark)); + newGranules.emplace(it->second, std::make_pair(GranuleMeta->GetPathId(), mark)); } } return granuleRemap; @@ -205,7 +201,9 @@ bool TCompactColumnEngineChanges::IsMovedPortion(const TPortionInfo& info) { void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); - self.BackgroundController.StartCompaction(CompactionInfo->GetPlanCompaction()); + self.BackgroundController.StartCompaction(NKikimr::NOlap::TPlanCompactionInfo(GranuleMeta->GetPathId(), !IsSplit())); + NeedGranuleStatusProvide = true; + GranuleMeta->OnCompactionStarted(!IsSplit()); } void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { @@ -213,13 +211,20 @@ void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShar self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds()); } -void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) { - self.BackgroundController.FinishCompaction(CompactionInfo->GetPlanCompaction()); - CompactionInfo->GetObject<TGranuleMeta>().AllowedInsertion(); +void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) { + self.BackgroundController.FinishCompaction(TPlanCompactionInfo(GranuleMeta->GetPathId(), !IsSplit())); + GranuleMeta->AllowedInsertion(); + Y_VERIFY(NeedGranuleStatusProvide); + if (context.FinishedSuccessfully) { + GranuleMeta->OnCompactionFinished(); + } else { + GranuleMeta->OnCompactionFailed(context.ErrorMessage); + } + NeedGranuleStatusProvide = false; } ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark) { - Y_VERIFY(pathId == SrcGranule.PathId); + Y_VERIFY(pathId == GranuleMeta->GetPathId()); if (!TmpGranuleIds.contains(mark)) { TmpGranuleIds[mark] = FirstGranuleId; ++FirstGranuleId; @@ -227,21 +232,25 @@ ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark) return TmpGranuleIds[mark]; } -TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule) +TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule) : Limits(limits) - , CompactionInfo(std::move(info)) - , SrcGranule(srcGranule) { - Y_VERIFY(CompactionInfo); - const auto& granuleInfo = CompactionInfo->GetObject<TGranuleMeta>(); + , GranuleMeta(granule) + , SrcGranule(srcGranule) +{ + Y_VERIFY(GranuleMeta); - SwitchedPortions.reserve(granuleInfo.GetPortions().size()); - for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) { + SwitchedPortions.reserve(GranuleMeta->GetPortions().size()); + for (const auto& [_, portionInfo] : GranuleMeta->GetPortions()) { if (portionInfo.IsActive()) { SwitchedPortions.push_back(portionInfo); - Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId()); + Y_VERIFY(portionInfo.Granule() == GranuleMeta->GetGranuleId()); } } Y_VERIFY(SwitchedPortions.size()); } +TCompactColumnEngineChanges::~TCompactColumnEngineChanges() { + Y_VERIFY(!NActors::TlsActivationContext || !NeedGranuleStatusProvide); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index 01cbc999b3..b83835d734 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -11,9 +11,10 @@ class TCompactColumnEngineChanges: public TChangesWithAppend { private: using TBase = TChangesWithAppend; THashMap<TMark, ui32> TmpGranuleIds; // mark -> tmp granule id + bool NeedGranuleStatusProvide = false; protected: const TCompactionLimits Limits; - std::unique_ptr<TCompactionInfo> CompactionInfo; + std::shared_ptr<TGranuleMeta> GranuleMeta; TCompactionSrcGranule SrcGranule; virtual void DoStart(NColumnShard::TColumnShard& self) override; @@ -24,6 +25,9 @@ protected: virtual void DoCompile(TFinalizationContext& context) override; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override; virtual TPortionMeta::EProduced GetResultProducedClass() const = 0; + virtual void OnAbortEmergency() override { + NeedGranuleStatusProvide = false; + } public: virtual bool IsSplit() const = 0; @@ -36,21 +40,13 @@ public: std::vector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule} std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones - TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule); + TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule); + ~TCompactColumnEngineChanges(); ui64 SetTmpGranule(ui64 pathId, const TMark& mark); THashMap<ui64, ui64> TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const; - virtual const TGranuleMeta* GetGranuleMeta() const override; - - virtual void OnChangesApplyFailed(const TString& errorMessage) override { - CompactionInfo->CompactionFailed(errorMessage); - } - virtual void OnChangesApplyFinished() override { - CompactionInfo->CompactionFinished(); - } - ui32 NumSplitInto(const ui32 srcRows) const; bool IsMovedPortion(const TPortionInfo& info); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp b/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp index 39d32ff70f..213d088db1 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp @@ -3,9 +3,4 @@ namespace NKikimr::NOlap { -NKikimr::NOlap::TPlanCompactionInfo TCompactionInfo::GetPlanCompaction() const { - auto& granuleMeta = GetObject<TGranuleMeta>(); - return TPlanCompactionInfo(granuleMeta.GetPathId(), InGranule()); -} - } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction_info.h b/ydb/core/tx/columnshard/engines/changes/compaction_info.h index bc69904ce0..bb88fba9f0 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction_info.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction_info.h @@ -7,16 +7,7 @@ #include <memory> namespace NKikimr::NOlap { - -class ICompactionObjectCallback { -public: - virtual ~ICompactionObjectCallback() = default; - virtual void OnCompactionStarted(const bool inGranule) = 0; - virtual void OnCompactionFinished() = 0; - virtual void OnCompactionFailed(const TString& reason) = 0; - virtual void OnCompactionCanceled(const TString& reason) = 0; - virtual TString DebugString() const = 0; -}; +class TGranuleMeta; class TPlanCompactionInfo { private: @@ -45,69 +36,32 @@ public: struct TCompactionInfo { private: - std::shared_ptr<ICompactionObjectCallback> CompactionObject; - mutable bool StatusProvided = false; + std::shared_ptr<TGranuleMeta> GranuleMeta; const bool InGranuleFlag = false; public: - TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule) - : CompactionObject(compactionObject) + TCompactionInfo(std::shared_ptr<TGranuleMeta> granule, const bool inGranule) + : GranuleMeta(granule) , InGranuleFlag(inGranule) { - Y_VERIFY(compactionObject); - CompactionObject->OnCompactionStarted(InGranuleFlag); + Y_VERIFY(granule); } - TPlanCompactionInfo GetPlanCompaction() const; - bool InGranule() const { return InGranuleFlag; } - template <class T> - const T& GetObject() const { - auto result = dynamic_cast<const T*>(CompactionObject.get()); - Y_VERIFY(result); - return *result; - } - - void CompactionFinished() const { - Y_VERIFY(!StatusProvided); - StatusProvided = true; - CompactionObject->OnCompactionFinished(); - } - - void CompactionCanceled(const TString& reason) const { - Y_VERIFY(!StatusProvided); - StatusProvided = true; - CompactionObject->OnCompactionCanceled(reason); + std::shared_ptr<TGranuleMeta> GetGranule() const { + return GranuleMeta; } - void CompactionFailed(const TString& reason) const { - Y_VERIFY(!StatusProvided); - StatusProvided = true; - CompactionObject->OnCompactionFailed(reason); - } - - ~TCompactionInfo() { - Y_VERIFY_DEBUG(StatusProvided); - if (!StatusProvided) { - CompactionObject->OnCompactionFailed("compaction unexpectedly finished"); - } - } - - friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) { - out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString(); - return out; - } }; struct TCompactionSrcGranule { - ui64 PathId = 0; - ui64 Granule = 0; TMark Mark; - TCompactionSrcGranule(ui64 pathId, ui64 granule, const TMark& mark) - : PathId(pathId), Granule(granule), Mark(mark) { + TCompactionSrcGranule(const TMark& mark) + : Mark(mark) + { } }; diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp index 7a76ab4aa1..f951fe7510 100644 --- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp @@ -119,7 +119,6 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const { std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(portions.size()); - auto resultSchema = context.SchemaVersions.GetLastSchema(); TSnapshot maxSnapshot = resultSchema->GetSnapshot(); @@ -141,7 +140,7 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum } TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { - const ui64 pathId = SrcGranule.PathId; + const ui64 pathId = GranuleMeta->GetPathId(); std::vector<TString> blobs; auto& switchedPortions = SwitchedPortions; Y_VERIFY(switchedPortions.size()); @@ -160,13 +159,13 @@ TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstr if (!slice || slice->num_rows() == 0) { continue; } - auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, GetGranuleMeta(), context); + auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, GranuleMeta.get(), context); for (auto&& portionInfo : tmp) { portions.emplace_back(std::move(portionInfo)); } } } else { - portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context); + portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GranuleMeta.get(), context); } Y_VERIFY(portions.size() > 0); @@ -186,7 +185,7 @@ void TInGranuleCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TC void TInGranuleCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); - auto& g = CompactionInfo->GetObject<NOlap::TGranuleMeta>(); + auto& g = *GranuleMeta; self.CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); Y_VERIFY(InitInGranuleMerge(SrcGranule.Mark, SwitchedPortions, Limits, MergeBorders).Ok()); } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index f5f3741284..d72dee9a8f 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -131,7 +131,7 @@ NKikimr::TConclusion<std::vector<TString>> TInsertColumnEngineChanges::DoConstru auto granuleBatches = TMarksGranules::SliceIntoGranules(merged, PathToGranule[pathId], resultSchema->GetIndexInfo()); for (auto& [granule, batch] : granuleBatches) { - auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context); + auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, nullptr, context); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { AppendedPortions.emplace_back(std::move(portion)); diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp index 01118f3c37..6dd1ffbba3 100644 --- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NOlap { TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { - const ui64 pathId = SrcGranule.PathId; + const ui64 pathId = GranuleMeta->GetPathId(); const TMark ts0 = SrcGranule.Mark; std::vector<TPortionInfo>& portions = SwitchedPortions; @@ -102,7 +102,7 @@ TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructB ui64 tmpGranule = SetTmpGranule(pathId, mark); for (const auto& batch : idBatches[id]) { // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context); + auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GranuleMeta.get(), context); Y_VERIFY(newPortions.size() > 0); for (auto& portion : newPortions) { AppendedPortions.emplace_back(std::move(portion)); @@ -118,7 +118,7 @@ TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructB ui64 tmpGranule = SetTmpGranule(pathId, ts); // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context); + auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GranuleMeta.get(), context); Y_VERIFY(portions.size() > 0); for (auto& portion : portions) { AppendedPortions.emplace_back(std::move(portion)); @@ -399,7 +399,7 @@ void TSplitCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColum void TSplitCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); - auto& g = CompactionInfo->GetObject<TGranuleMeta>(); + auto& g = *GranuleMeta; self.CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount()); } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 9d3b359b7a..690fb915e8 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -22,9 +22,9 @@ std::shared_ptr<NKikimr::NOlap::TCompactColumnEngineChanges> TColumnEngineForLog const TCompactionLimits& limits, const TSnapshot& initSnapshot, const TCompactionSrcGranule& srcGranule) { std::shared_ptr<TCompactColumnEngineChanges> result; if (info->InGranule()) { - result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info), srcGranule); + result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, info->GetGranule(), srcGranule); } else { - result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info), srcGranule); + result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, info->GetGranule(), srcGranule); } result->InitSnapshot = initSnapshot; return result; @@ -309,13 +309,13 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info, const TCompactionLimits& limits) noexcept { - const ui64 pathId = info->GetPlanCompaction().GetPathId(); + const ui64 pathId = info->GetGranule()->GetPathId(); Y_VERIFY(PathGranules.contains(pathId)); - auto& g = info->GetObject<TGranuleMeta>(); + auto g = info->GetGranule(); for (const auto& [mark, pathGranule] : PathGranules[pathId]) { - if (pathGranule == g.GetGranuleId()) { - TCompactionSrcGranule srcGranule = TCompactionSrcGranule(pathId, g.GetGranuleId(), mark); + if (pathGranule == g->GetGranuleId()) { + TCompactionSrcGranule srcGranule = TCompactionSrcGranule(mark); auto changes = TChangesConstructor::BuildCompactionChanges(std::move(info), limits, LastSnapshot, srcGranule); NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes); return changes; diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 61e9a959eb..bdce826b88 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -95,14 +95,6 @@ void TGranuleMeta::OnCompactionFailed(const TString& reason) { Owner->UpdateGranuleInfo(*this); } -void TGranuleMeta::OnCompactionCanceled(const TString& reason) { - AllowInsertionFlag = false; - Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction)); - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionCanceled")("reason", reason)("info", DebugString()); - CompactionPriorityInfo.OnCompactionCanceled(); - Owner->UpdateGranuleInfo(*this); -} - void TGranuleMeta::OnCompactionStarted(const bool inGranule) { AllowInsertionFlag = false; Y_VERIFY(Activity.empty()); diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 0ba027708e..25fd155f5b 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -267,7 +267,7 @@ public: }; -class TGranuleMeta: public ICompactionObjectCallback, TNonCopyable { +class TGranuleMeta: TNonCopyable { public: enum class EActivity { SplitCompaction, @@ -331,15 +331,14 @@ public: return Activity.empty(); } - virtual void OnCompactionStarted(const bool inGranule) override; + void OnCompactionStarted(const bool inGranule); - virtual void OnCompactionCanceled(const TString& reason) override; - virtual void OnCompactionFailed(const TString& reason) override; - virtual void OnCompactionFinished() override; + void OnCompactionFailed(const TString& reason); + void OnCompactionFinished(); void UpsertPortion(const TPortionInfo& info); - virtual TString DebugString() const override { + TString DebugString() const { return TStringBuilder() << "granule:" << GetGranuleId() << ";" << "path_id:" << Record.PathId << ";" << "size:" << GetAdditiveSummary().GetGranuleSize() << ";" |