diff options
author | nsofya <nsofya@ydb.tech> | 2023-11-02 11:39:31 +0300 |
---|---|---|
committer | nsofya <nsofya@ydb.tech> | 2023-11-02 12:04:44 +0300 |
commit | aa14f36616cf44694a09a0644717c76f91d8f917 (patch) | |
tree | 9b445608d1b55a5f819c3ff9002de2f90979ae88 | |
parent | 038efa2093da1845b7c324ecffd63028dd4df871 (diff) | |
download | ydb-aa14f36616cf44694a09a0644717c76f91d8f917.tar.gz |
KIKIMR-19804: Correct normalizers cycle
4 files changed, 15 insertions, 8 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index be0d10b21d..eef7868e55 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -222,15 +222,20 @@ public: bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("event", "initialize_shard"); - ACFL_INFO("step", "TTxUpdateSchema.Execute_Start"); + ACFL_INFO("step", "TTxUpdateSchema.Execute_Start")("details", Self->NormalizerController.DebugString()); - if (!Self->NormalizerController.IsNormalizationFinished()) { + while (!Self->NormalizerController.IsNormalizationFinished()) { auto normalizer = Self->NormalizerController.GetNormalizer(); auto result = normalizer->Init(Self->NormalizerController, txc); if (result.IsSuccess()) { NormalizerTasks = result.DetachResult(); + if (!NormalizerTasks.empty()) { + break; + } + Self->NormalizerController.SwitchNormalizer(); } else { Self->NormalizerController.GetCounters().OnNormalizerFails(); + return false; } } ACFL_INFO("step", "TTxUpdateSchema.Execute_Finish"); @@ -270,7 +275,7 @@ private: }; bool TTxApplyNormalizer::Execute(TTransactionContext& txc, const TActorContext&) { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute"); + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxApplyNormalizer.Execute")("details", Self->NormalizerController.DebugString());; Self->NormalizerController.GetNormalizer()->OnResultReady(); return Changes->Apply(txc, Self->NormalizerController); } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 02323ba18d..5de9690781 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -171,7 +171,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { currentIndexInfo = &VersionedIndex.GetSchema(portion.GetMinSnapshot())->GetIndexInfo(); lastSnapshot = portion.GetMinSnapshot(); } - Y_ABORT_UNLESS(portion.ValidSnapshotInfo()); + AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString()); // Locate granule and append the record. TColumnRecord rec(loadContext, *currentIndexInfo); GetGranulePtrVerified(portion.GetPathId())->AddColumnRecord(*currentIndexInfo, portion, rec, loadContext.GetPortionMeta()); diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index e637ed9e87..fba92e7f61 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -105,6 +105,10 @@ namespace NKikimr::NOlap { return StoragesManager; } + TString DebugString() const { + return TStringBuilder() << "normalizers_count=" << Normalizers.size() << ";current_normalizer=" << CurrentNormalizerIndex; + } + const INormalizerComponent::TPtr& GetNormalizer() const; bool IsNormalizationFinished() const; bool SwitchNormalizer(); diff --git a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp index 3437a471ea..8af40fea7a 100644 --- a/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/granule/normalizer.cpp @@ -55,7 +55,7 @@ public: } while (!rowset.EndOfSet()) { - if (!rowset.HaveValue<Schema::IndexColumns::PathId>()) { + if (!rowset.HaveValue<Schema::IndexColumns::PathId>() || rowset.GetValue<Schema::IndexColumns::PathId>() == 0) { TChunkData key; key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(); key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(); @@ -73,12 +73,10 @@ public: } } } - ACFL_INFO("normalizer", "TGranulesNormalizer")("message", TStringBuilder() << Chunks.size() << " portions found"); - + ACFL_INFO("normalizer", "TGranulesNormalizer")("message", TStringBuilder() << Chunks.size() << " chunks found"); if (Chunks.empty()) { return true; } - controller.GetCounters().CountObjects(Chunks.size()); { auto rowset = db.Table<Schema::IndexGranules>().Select(); |