aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-28 19:45:59 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-28 19:45:59 +0300
commit494854b865e4e9ad5847d32ba8156fb4caa3e8c7 (patch)
treebd1ace3542c42e92a846c575dec85448c9c1e99d
parent2dd5e6a379dd2cbe5025655909c28b8672602ed4 (diff)
downloadydb-494854b865e4e9ad5847d32ba8156fb4caa3e8c7.tar.gz
KIKIMR-18796:clean changes applying methods and restore guard control for compaction
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp12
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract.h34
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp47
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.h18
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction_info.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction_info.h66
-rw-r--r--ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/split_compaction.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h11
14 files changed, 98 insertions, 164 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 922b5492b0..c154f08565 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -69,7 +69,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
- ApplySuccess = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
+ ApplySuccess = Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot);
if (ApplySuccess) {
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
NOlap::TWriteIndexContext context(txc, dbWrap);
@@ -81,13 +81,13 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Self->UpdateIndexCounters();
} else {
- NOlap::TChangesFinishContext context;
+ NOlap::TChangesFinishContext context("cannot apply changes");
changes->Abort(*Self, context);
LOG_S_NOTICE(TxPrefix() << "(" << changes->TypeString() << ") cannot apply changes: "
<< *changes << TxSuffix());
}
} else {
- NOlap::TChangesFinishContext context;
+ NOlap::TChangesFinishContext context("cannot write index blobs");
changes->Abort(*Self, context);
LOG_S_ERROR(TxPrefix() << " (" << changes->TypeString() << ") cannot write index blobs" << TxSuffix());
}
@@ -103,8 +103,8 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
const ui64 blobsWritten = Ev->Get()->PutResult->GetBlobBatch().GetBlobCount();
const ui64 bytesWritten = Ev->Get()->PutResult->GetBlobBatch().GetTotalSize();
- {
- NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, ApplySuccess, Ev->Get()->Duration, TriggerActivity);
+ if (!Ev->Get()->IndexChanges->IsAborted()) {
+ NOlap::TWriteIndexCompleteContext context(ctx, blobsWritten, bytesWritten, Ev->Get()->Duration, TriggerActivity);
Ev->Get()->IndexChanges->WriteIndexComplete(*Self, context);
}
@@ -126,7 +126,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte
IncCounter(COUNTER_OUT_OF_SPACE);
ev->Get()->SetPutStatus(NKikimrProto::TRYLATER);
- NOlap::TChangesFinishContext context;
+ NOlap::TChangesFinishContext context("out of disk space");
ev->Get()->IndexChanges->Abort(*this, context);
ctx.Schedule(FailActivationDelay, new TEvPrivate::TEvPeriodicWakeup(true));
} else {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 20594b328d..445a47c382 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -764,7 +764,7 @@ void TColumnShard::SetupCompaction() {
break;
}
- LOG_S_DEBUG("Prepare " << *compactionInfo << " at tablet " << TabletID());
+ LOG_S_DEBUG("Prepare " << compactionInfo->GetGranule()->DebugString() << "/" << compactionInfo->InGranule() << " at tablet " << TabletID());
auto indexChanges = TablesManager.MutablePrimaryIndex().StartCompaction(std::move(compactionInfo), limits);
if (!indexChanges) {
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract.cpp
index b12b1cdec0..8fe2705f12 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/abstract.cpp
@@ -34,20 +34,12 @@ NKikimr::TConclusion<std::vector<TString>> TColumnEngineChanges::ConstructBlobs(
}
bool TColumnEngineChanges::ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) {
- Y_VERIFY(Stage != EStage::Aborted);
- if ((ui32)Stage >= (ui32)EStage::Applied) {
- return true;
- }
Y_VERIFY(Stage == EStage::Compiled);
if (!DoApplyChanges(self, context, dryRun)) {
- if (dryRun) {
- OnChangesApplyFailed("problems on apply");
- }
Y_VERIFY(dryRun);
return false;
} else if (!dryRun) {
- OnChangesApplyFinished();
Stage = EStage::Applied;
}
return true;
@@ -65,17 +57,11 @@ void TColumnEngineChanges::WriteIndex(NColumnShard::TColumnShard& self, TWriteIn
}
void TColumnEngineChanges::WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
- Y_VERIFY(Stage == EStage::Aborted || Stage == EStage::Written);
- if (Stage == EStage::Aborted) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("stage", Stage);
- return;
- }
- if (Stage == EStage::Written) {
- Stage = EStage::Finished;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
- DoWriteIndexComplete(self, context);
- DoOnFinish(self, context);
- }
+ Y_VERIFY(Stage == EStage::Written);
+ Stage = EStage::Finished;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
+ DoWriteIndexComplete(self, context);
+ DoOnFinish(self, context);
}
void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
@@ -91,13 +77,12 @@ void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
}
TColumnEngineChanges::~TColumnEngineChanges() {
- Y_VERIFY_DEBUG(!NActors::TlsActivationContext || Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted);
+ Y_VERIFY(!NActors::TlsActivationContext || Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted);
}
void TColumnEngineChanges::Abort(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
Y_VERIFY(Stage != EStage::Finished && Stage != EStage::Created && Stage != EStage::Aborted);
Stage = EStage::Aborted;
- DoAbort();
DoOnFinish(self, context);
}
@@ -121,6 +106,7 @@ void TColumnEngineChanges::StartEmergency() {
void TColumnEngineChanges::AbortEmergency() {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "AbortEmergency");
Stage = EStage::Aborted;
+ OnAbortEmergency();
}
TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper)
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h
index 3df0bcc5ee..c609b312ed 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract.h
@@ -100,23 +100,32 @@ public:
};
class TChangesFinishContext {
+public:
+ const bool FinishedSuccessfully = true;
+ const TString ErrorMessage;
+ TChangesFinishContext(const TString& errorMessage)
+ : FinishedSuccessfully(false)
+ , ErrorMessage(errorMessage) {
+
+ }
+ TChangesFinishContext() = default;
};
class TWriteIndexCompleteContext: TNonCopyable, public TChangesFinishContext {
+private:
+ using TBase = TChangesFinishContext;
public:
const TActorContext& ActorContext;
const ui32 BlobsWritten;
const ui64 BytesWritten;
- const bool FinishedSuccessfully;
const TDuration Duration;
NColumnShard::TBackgroundActivity& TriggerActivity;
- TWriteIndexCompleteContext(const TActorContext& actorContext, const ui32 blobsWritten, const ui64 bytesWritten, const bool finishedSuccessfully, const TDuration d,
- NColumnShard::TBackgroundActivity& triggerActivity)
+ TWriteIndexCompleteContext(const TActorContext& actorContext, const ui32 blobsWritten, const ui64 bytesWritten
+ , const TDuration d, NColumnShard::TBackgroundActivity& triggerActivity)
: ActorContext(actorContext)
, BlobsWritten(blobsWritten)
, BytesWritten(bytesWritten)
- , FinishedSuccessfully(finishedSuccessfully)
, Duration(d)
, TriggerActivity(triggerActivity)
{
@@ -170,23 +179,22 @@ protected:
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) = 0;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) = 0;
- virtual void OnChangesApplyFailed(const TString& /*errorMessage*/) {
- }
- virtual void OnChangesApplyFinished() {
- }
- virtual void DoAbort() {
-
- }
virtual bool NeedConstruction() const {
return true;
}
virtual void DoStart(NColumnShard::TColumnShard& self) = 0;
virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept = 0;
+ virtual void OnAbortEmergency() {
+ }
public:
TConclusion<std::vector<TString>> ConstructBlobs(TConstructionContext& context);
virtual ~TColumnEngineChanges();
+ bool IsAborted() const {
+ return Stage == EStage::Aborted;
+ }
+
void StartEmergency();
void AbortEmergency();
@@ -216,10 +224,6 @@ public:
virtual TString TypeString() const = 0;
TString DebugString() const;
- virtual const TGranuleMeta* GetGranuleMeta() const {
- return nullptr;
- }
-
ui64 TotalBlobsSize() const {
ui64 size = 0;
for (const auto& [_, blob] : Blobs) {
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index 9c76092b4e..a3d8d5e70f 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -6,10 +6,6 @@
namespace NKikimr::NOlap {
-const TGranuleMeta* TCompactColumnEngineChanges::GetGranuleMeta() const {
- return &CompactionInfo->GetObject<TGranuleMeta>();
-}
-
void TCompactColumnEngineChanges::DoDebugString(TStringOutput& out) const {
TBase::DoDebugString(out);
if (ui32 switched = SwitchedPortions.size()) {
@@ -181,14 +177,14 @@ THashMap<ui64, ui64> TCompactColumnEngineChanges::TmpToNewGranules(TFinalization
for (const auto& [mark, counter] : TmpGranuleIds) {
if (mark == SrcGranule.Mark) {
Y_VERIFY(!counter);
- granuleRemap[counter] = SrcGranule.Granule;
+ granuleRemap[counter] = GranuleMeta->GetGranuleId();
} else {
Y_VERIFY(counter);
auto it = granuleRemap.find(counter);
if (it == granuleRemap.end()) {
it = granuleRemap.emplace(counter, context.NextGranuleId()).first;
}
- newGranules.emplace(it->second, std::make_pair(SrcGranule.PathId, mark));
+ newGranules.emplace(it->second, std::make_pair(GranuleMeta->GetPathId(), mark));
}
}
return granuleRemap;
@@ -205,7 +201,9 @@ bool TCompactColumnEngineChanges::IsMovedPortion(const TPortionInfo& info) {
void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);
- self.BackgroundController.StartCompaction(CompactionInfo->GetPlanCompaction());
+ self.BackgroundController.StartCompaction(NKikimr::NOlap::TPlanCompactionInfo(GranuleMeta->GetPathId(), !IsSplit()));
+ NeedGranuleStatusProvide = true;
+ GranuleMeta->OnCompactionStarted(!IsSplit());
}
void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
@@ -213,13 +211,20 @@ void TCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShar
self.IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds());
}
-void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& /*context*/) {
- self.BackgroundController.FinishCompaction(CompactionInfo->GetPlanCompaction());
- CompactionInfo->GetObject<TGranuleMeta>().AllowedInsertion();
+void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) {
+ self.BackgroundController.FinishCompaction(TPlanCompactionInfo(GranuleMeta->GetPathId(), !IsSplit()));
+ GranuleMeta->AllowedInsertion();
+ Y_VERIFY(NeedGranuleStatusProvide);
+ if (context.FinishedSuccessfully) {
+ GranuleMeta->OnCompactionFinished();
+ } else {
+ GranuleMeta->OnCompactionFailed(context.ErrorMessage);
+ }
+ NeedGranuleStatusProvide = false;
}
ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark) {
- Y_VERIFY(pathId == SrcGranule.PathId);
+ Y_VERIFY(pathId == GranuleMeta->GetPathId());
if (!TmpGranuleIds.contains(mark)) {
TmpGranuleIds[mark] = FirstGranuleId;
++FirstGranuleId;
@@ -227,21 +232,25 @@ ui64 TCompactColumnEngineChanges::SetTmpGranule(ui64 pathId, const TMark& mark)
return TmpGranuleIds[mark];
}
-TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule)
+TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule)
: Limits(limits)
- , CompactionInfo(std::move(info))
- , SrcGranule(srcGranule) {
- Y_VERIFY(CompactionInfo);
- const auto& granuleInfo = CompactionInfo->GetObject<TGranuleMeta>();
+ , GranuleMeta(granule)
+ , SrcGranule(srcGranule)
+{
+ Y_VERIFY(GranuleMeta);
- SwitchedPortions.reserve(granuleInfo.GetPortions().size());
- for (const auto& [_, portionInfo] : granuleInfo.GetPortions()) {
+ SwitchedPortions.reserve(GranuleMeta->GetPortions().size());
+ for (const auto& [_, portionInfo] : GranuleMeta->GetPortions()) {
if (portionInfo.IsActive()) {
SwitchedPortions.push_back(portionInfo);
- Y_VERIFY(portionInfo.Granule() == granuleInfo.GetGranuleId());
+ Y_VERIFY(portionInfo.Granule() == GranuleMeta->GetGranuleId());
}
}
Y_VERIFY(SwitchedPortions.size());
}
+TCompactColumnEngineChanges::~TCompactColumnEngineChanges() {
+ Y_VERIFY(!NActors::TlsActivationContext || !NeedGranuleStatusProvide);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h
index 01cbc999b3..b83835d734 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.h
@@ -11,9 +11,10 @@ class TCompactColumnEngineChanges: public TChangesWithAppend {
private:
using TBase = TChangesWithAppend;
THashMap<TMark, ui32> TmpGranuleIds; // mark -> tmp granule id
+ bool NeedGranuleStatusProvide = false;
protected:
const TCompactionLimits Limits;
- std::unique_ptr<TCompactionInfo> CompactionInfo;
+ std::shared_ptr<TGranuleMeta> GranuleMeta;
TCompactionSrcGranule SrcGranule;
virtual void DoStart(NColumnShard::TColumnShard& self) override;
@@ -24,6 +25,9 @@ protected:
virtual void DoCompile(TFinalizationContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override;
virtual TPortionMeta::EProduced GetResultProducedClass() const = 0;
+ virtual void OnAbortEmergency() override {
+ NeedGranuleStatusProvide = false;
+ }
public:
virtual bool IsSplit() const = 0;
@@ -36,21 +40,13 @@ public:
std::vector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule}
std::vector<TPortionInfo> SwitchedPortions; // Portions that would be replaced by new ones
- TCompactColumnEngineChanges(const TCompactionLimits& limits, std::unique_ptr<TCompactionInfo>&& info, const TCompactionSrcGranule& srcGranule);
+ TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const TCompactionSrcGranule& srcGranule);
+ ~TCompactColumnEngineChanges();
ui64 SetTmpGranule(ui64 pathId, const TMark& mark);
THashMap<ui64, ui64> TmpToNewGranules(TFinalizationContext& context, THashMap<ui64, std::pair<ui64, TMark>>& newGranules) const;
- virtual const TGranuleMeta* GetGranuleMeta() const override;
-
- virtual void OnChangesApplyFailed(const TString& errorMessage) override {
- CompactionInfo->CompactionFailed(errorMessage);
- }
- virtual void OnChangesApplyFinished() override {
- CompactionInfo->CompactionFinished();
- }
-
ui32 NumSplitInto(const ui32 srcRows) const;
bool IsMovedPortion(const TPortionInfo& info);
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp b/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp
index 39d32ff70f..213d088db1 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction_info.cpp
@@ -3,9 +3,4 @@
namespace NKikimr::NOlap {
-NKikimr::NOlap::TPlanCompactionInfo TCompactionInfo::GetPlanCompaction() const {
- auto& granuleMeta = GetObject<TGranuleMeta>();
- return TPlanCompactionInfo(granuleMeta.GetPathId(), InGranule());
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction_info.h b/ydb/core/tx/columnshard/engines/changes/compaction_info.h
index bc69904ce0..bb88fba9f0 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction_info.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction_info.h
@@ -7,16 +7,7 @@
#include <memory>
namespace NKikimr::NOlap {
-
-class ICompactionObjectCallback {
-public:
- virtual ~ICompactionObjectCallback() = default;
- virtual void OnCompactionStarted(const bool inGranule) = 0;
- virtual void OnCompactionFinished() = 0;
- virtual void OnCompactionFailed(const TString& reason) = 0;
- virtual void OnCompactionCanceled(const TString& reason) = 0;
- virtual TString DebugString() const = 0;
-};
+class TGranuleMeta;
class TPlanCompactionInfo {
private:
@@ -45,69 +36,32 @@ public:
struct TCompactionInfo {
private:
- std::shared_ptr<ICompactionObjectCallback> CompactionObject;
- mutable bool StatusProvided = false;
+ std::shared_ptr<TGranuleMeta> GranuleMeta;
const bool InGranuleFlag = false;
public:
- TCompactionInfo(std::shared_ptr<ICompactionObjectCallback> compactionObject, const bool inGranule)
- : CompactionObject(compactionObject)
+ TCompactionInfo(std::shared_ptr<TGranuleMeta> granule, const bool inGranule)
+ : GranuleMeta(granule)
, InGranuleFlag(inGranule)
{
- Y_VERIFY(compactionObject);
- CompactionObject->OnCompactionStarted(InGranuleFlag);
+ Y_VERIFY(granule);
}
- TPlanCompactionInfo GetPlanCompaction() const;
-
bool InGranule() const {
return InGranuleFlag;
}
- template <class T>
- const T& GetObject() const {
- auto result = dynamic_cast<const T*>(CompactionObject.get());
- Y_VERIFY(result);
- return *result;
- }
-
- void CompactionFinished() const {
- Y_VERIFY(!StatusProvided);
- StatusProvided = true;
- CompactionObject->OnCompactionFinished();
- }
-
- void CompactionCanceled(const TString& reason) const {
- Y_VERIFY(!StatusProvided);
- StatusProvided = true;
- CompactionObject->OnCompactionCanceled(reason);
+ std::shared_ptr<TGranuleMeta> GetGranule() const {
+ return GranuleMeta;
}
- void CompactionFailed(const TString& reason) const {
- Y_VERIFY(!StatusProvided);
- StatusProvided = true;
- CompactionObject->OnCompactionFailed(reason);
- }
-
- ~TCompactionInfo() {
- Y_VERIFY_DEBUG(StatusProvided);
- if (!StatusProvided) {
- CompactionObject->OnCompactionFailed("compaction unexpectedly finished");
- }
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TCompactionInfo& info) {
- out << (info.InGranuleFlag ? "in granule" : "split granule") << " compaction of granule: " << info.CompactionObject->DebugString();
- return out;
- }
};
struct TCompactionSrcGranule {
- ui64 PathId = 0;
- ui64 Granule = 0;
TMark Mark;
- TCompactionSrcGranule(ui64 pathId, ui64 granule, const TMark& mark)
- : PathId(pathId), Granule(granule), Mark(mark) {
+ TCompactionSrcGranule(const TMark& mark)
+ : Mark(mark)
+ {
}
};
diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
index 7a76ab4aa1..f951fe7510 100644
--- a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
@@ -119,7 +119,6 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum
const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const {
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(portions.size());
-
auto resultSchema = context.SchemaVersions.GetLastSchema();
TSnapshot maxSnapshot = resultSchema->GetSnapshot();
@@ -141,7 +140,7 @@ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColum
}
TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
- const ui64 pathId = SrcGranule.PathId;
+ const ui64 pathId = GranuleMeta->GetPathId();
std::vector<TString> blobs;
auto& switchedPortions = SwitchedPortions;
Y_VERIFY(switchedPortions.size());
@@ -160,13 +159,13 @@ TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstr
if (!slice || slice->num_rows() == 0) {
continue;
}
- auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, GranuleMeta.get(), context);
for (auto&& portionInfo : tmp) {
portions.emplace_back(std::move(portionInfo));
}
}
} else {
- portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GranuleMeta.get(), context);
}
Y_VERIFY(portions.size() > 0);
@@ -186,7 +185,7 @@ void TInGranuleCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TC
void TInGranuleCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);
- auto& g = CompactionInfo->GetObject<NOlap::TGranuleMeta>();
+ auto& g = *GranuleMeta;
self.CSCounters.OnInternalCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
Y_VERIFY(InitInGranuleMerge(SrcGranule.Mark, SwitchedPortions, Limits, MergeBorders).Ok());
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index f5f3741284..d72dee9a8f 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -131,7 +131,7 @@ NKikimr::TConclusion<std::vector<TString>> TInsertColumnEngineChanges::DoConstru
auto granuleBatches = TMarksGranules::SliceIntoGranules(merged, PathToGranule[pathId], resultSchema->GetIndexInfo());
for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, nullptr, context);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
AppendedPortions.emplace_back(std::move(portion));
diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
index 01118f3c37..6dd1ffbba3 100644
--- a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
@@ -5,7 +5,7 @@
namespace NKikimr::NOlap {
TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
- const ui64 pathId = SrcGranule.PathId;
+ const ui64 pathId = GranuleMeta->GetPathId();
const TMark ts0 = SrcGranule.Mark;
std::vector<TPortionInfo>& portions = SwitchedPortions;
@@ -102,7 +102,7 @@ TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructB
ui64 tmpGranule = SetTmpGranule(pathId, mark);
for (const auto& batch : idBatches[id]) {
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GranuleMeta.get(), context);
Y_VERIFY(newPortions.size() > 0);
for (auto& portion : newPortions) {
AppendedPortions.emplace_back(std::move(portion));
@@ -118,7 +118,7 @@ TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructB
ui64 tmpGranule = SetTmpGranule(pathId, ts);
// Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GranuleMeta.get(), context);
Y_VERIFY(portions.size() > 0);
for (auto& portion : portions) {
AppendedPortions.emplace_back(std::move(portion));
@@ -399,7 +399,7 @@ void TSplitCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColum
void TSplitCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);
- auto& g = CompactionInfo->GetObject<TGranuleMeta>();
+ auto& g = *GranuleMeta;
self.CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 9d3b359b7a..690fb915e8 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -22,9 +22,9 @@ std::shared_ptr<NKikimr::NOlap::TCompactColumnEngineChanges> TColumnEngineForLog
const TCompactionLimits& limits, const TSnapshot& initSnapshot, const TCompactionSrcGranule& srcGranule) {
std::shared_ptr<TCompactColumnEngineChanges> result;
if (info->InGranule()) {
- result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info), srcGranule);
+ result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, info->GetGranule(), srcGranule);
} else {
- result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info), srcGranule);
+ result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, info->GetGranule(), srcGranule);
}
result->InitSnapshot = initSnapshot;
return result;
@@ -309,13 +309,13 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
std::shared_ptr<TCompactColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std::unique_ptr<TCompactionInfo>&& info,
const TCompactionLimits& limits) noexcept {
- const ui64 pathId = info->GetPlanCompaction().GetPathId();
+ const ui64 pathId = info->GetGranule()->GetPathId();
Y_VERIFY(PathGranules.contains(pathId));
- auto& g = info->GetObject<TGranuleMeta>();
+ auto g = info->GetGranule();
for (const auto& [mark, pathGranule] : PathGranules[pathId]) {
- if (pathGranule == g.GetGranuleId()) {
- TCompactionSrcGranule srcGranule = TCompactionSrcGranule(pathId, g.GetGranuleId(), mark);
+ if (pathGranule == g->GetGranuleId()) {
+ TCompactionSrcGranule srcGranule = TCompactionSrcGranule(mark);
auto changes = TChangesConstructor::BuildCompactionChanges(std::move(info), limits, LastSnapshot, srcGranule);
NYDBTest::TControllers::GetColumnShardController()->OnStartCompaction(changes);
return changes;
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 61e9a959eb..bdce826b88 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -95,14 +95,6 @@ void TGranuleMeta::OnCompactionFailed(const TString& reason) {
Owner->UpdateGranuleInfo(*this);
}
-void TGranuleMeta::OnCompactionCanceled(const TString& reason) {
- AllowInsertionFlag = false;
- Y_VERIFY(Activity.erase(EActivity::InternalCompaction) || Activity.erase(EActivity::SplitCompaction));
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "OnCompactionCanceled")("reason", reason)("info", DebugString());
- CompactionPriorityInfo.OnCompactionCanceled();
- Owner->UpdateGranuleInfo(*this);
-}
-
void TGranuleMeta::OnCompactionStarted(const bool inGranule) {
AllowInsertionFlag = false;
Y_VERIFY(Activity.empty());
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 0ba027708e..25fd155f5b 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -267,7 +267,7 @@ public:
};
-class TGranuleMeta: public ICompactionObjectCallback, TNonCopyable {
+class TGranuleMeta: TNonCopyable {
public:
enum class EActivity {
SplitCompaction,
@@ -331,15 +331,14 @@ public:
return Activity.empty();
}
- virtual void OnCompactionStarted(const bool inGranule) override;
+ void OnCompactionStarted(const bool inGranule);
- virtual void OnCompactionCanceled(const TString& reason) override;
- virtual void OnCompactionFailed(const TString& reason) override;
- virtual void OnCompactionFinished() override;
+ void OnCompactionFailed(const TString& reason);
+ void OnCompactionFinished();
void UpsertPortion(const TPortionInfo& info);
- virtual TString DebugString() const override {
+ TString DebugString() const {
return TStringBuilder() << "granule:" << GetGranuleId() << ";"
<< "path_id:" << Record.PathId << ";"
<< "size:" << GetAdditiveSummary().GetGranuleSize() << ";"