diff options
author | chertus <azuikov@ydb.tech> | 2023-06-27 21:04:05 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-06-27 21:04:05 +0300 |
commit | 6bbbfe451fa07002cef3192f7ccf843d1faa80eb (patch) | |
tree | e5d9446e6edafa28cabf09c094399122d8456b44 | |
parent | b2a20d29e2f47fced13790aff60f9a7dbea8a353 (diff) | |
download | ydb-6bbbfe451fa07002cef3192f7ccf843d1faa80eb.tar.gz |
fix ColumnShard counters in case of rase between export and compaction
3 files changed, 131 insertions, 38 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index bf4bfc44f37..81baad94632 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -220,8 +220,8 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() { } void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType, - std::optional<TPortionMeta::EProduced> exProduced) { - UpdatePortionStats(Counters, portionInfo, updateType, exProduced); + const TPortionInfo* exPortionInfo) { + UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo); ui64 granule = portionInfo.Granule(); Y_VERIFY(granule); @@ -233,12 +233,12 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, E stats = std::make_shared<TColumnEngineStats>(); stats->Tables = 1; } - UpdatePortionStats(*PathStats[pathId], portionInfo, updateType, exProduced); + UpdatePortionStats(*PathStats[pathId], portionInfo, updateType, exPortionInfo); } void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, EStatsUpdateType updateType, - std::optional<TPortionMeta::EProduced> exProduced) const { + const TPortionInfo* exPortionInfo) const { TColumnEngineStats::TPortionsStats deltaStats; ui64 columnRecords = portionInfo.Records.size(); ui64 metadataBytes = 0; @@ -259,31 +259,35 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c } deltaStats.Blobs = blobs.size(); deltaStats.Portions = 1; + + Y_VERIFY(!exPortionInfo || exPortionInfo->Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED); Y_VERIFY(portionInfo.Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED); - TColumnEngineStats::TPortionsStats& srcStats = exProduced - ? engineStats.StatsByType[*exProduced] + + TColumnEngineStats::TPortionsStats& srcStats = exPortionInfo + ? (exPortionInfo->IsActive() + ? engineStats.StatsByType[exPortionInfo->Meta.Produced] + : engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]) : engineStats.StatsByType[portionInfo.Meta.Produced]; - auto* stats = portionInfo.IsActive() - ? &engineStats.StatsByType[portionInfo.Meta.Produced] - : &engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]; + TColumnEngineStats::TPortionsStats& stats = portionInfo.IsActive() + ? engineStats.StatsByType[portionInfo.Meta.Produced] + : engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE]; const bool isErase = updateType == EStatsUpdateType::ERASE; - const bool isLoad = updateType == EStatsUpdateType::LOAD; - const bool isAppended = portionInfo.IsActive() && !exProduced; + const bool isAdd = updateType == EStatsUpdateType::ADD; if (isErase) { // PortionsToDrop engineStats.ColumnRecords -= columnRecords; engineStats.ColumnMetadataBytes -= metadataBytes; - *stats -= deltaStats; - } else if (isLoad || isAppended) { // AppendedPortions + stats -= deltaStats; + } else if (isAdd) { // Load || AppendedPortions engineStats.ColumnRecords += columnRecords; engineStats.ColumnMetadataBytes += metadataBytes; - *stats += deltaStats; - } else { // SwitchedPortions || PortionsToEvict + stats += deltaStats; + } else if (&srcStats != &stats) { // SwitchedPortions || PortionsToEvict srcStats -= deltaStats; - (*stats) += deltaStats; + stats += deltaStats; } } @@ -319,7 +323,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl emptyGranulePaths.insert(spg->PathId()); } for (const auto& [_, portionInfo] : spg->GetPortions()) { - UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD); + UpdatePortionStats(portionInfo, EStatsUpdateType::ADD); if (portionInfo.CheckForCleanup()) { CleanupPortions.emplace(portionInfo.GetAddress()); } @@ -766,7 +770,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, ui64 granule = portionInfo.Granule(); ui64 portion = portionInfo.Portion(); if (!Granules.contains(granule)) { - LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId); + LOG_S_ERROR("Cannot update unknown granule " << granule << " at tablet " << TabletId); return false; } if (!Granules[granule]->GetPortions().contains(portion)) { @@ -786,7 +790,10 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } } - if (!UpsertPortion(portionInfo, apply)) { + // In case of race with eviction portion could become evicted + const TPortionInfo& oldInfo = Granules[granule]->GetPortionVerified(portion); + + if (!UpsertPortion(portionInfo, apply, &oldInfo)) { LOG_S_ERROR("Cannot update portion " << portionInfo << " at tablet " << TabletId); return false; } @@ -815,7 +822,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } // In case of race with compaction portion could become inactive - // TODO: evict others instead of abort eviction const TPortionInfo& oldInfo = Granules[granule]->GetPortionVerified(portion); if (!oldInfo.IsActive()) { LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << TabletId); @@ -823,7 +829,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, } Y_VERIFY(portionInfo.TierName != oldInfo.TierName); - if (!UpsertPortion(portionInfo, apply, true, &oldInfo)) { + if (!UpsertPortion(portionInfo, apply, &oldInfo)) { LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << TabletId); return false; } @@ -837,15 +843,27 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes, // Move portions in granules (zero-copy switch + append into new granules) - for (auto& [info, granule] : changes.PortionsToMove) { + for (auto& [info, dstGranule] : changes.PortionsToMove) { const auto& portionInfo = info; + + ui64 granule = portionInfo.Granule(); + ui64 portion = portionInfo.Portion(); + if (!Granules.contains(granule) || !Granules[granule]->GetPortions().contains(portion)) { + LOG_S_ERROR("Cannot move unknown portion " << portionInfo << " at tablet " << TabletId); + return false; + } + + // In case of race with eviction portion could become evicted + const TPortionInfo oldInfo = Granules[granule]->GetPortionVerified(portion); + if (!ErasePortion(portionInfo, apply, false)) { LOG_S_ERROR("Cannot erase moved portion " << portionInfo << " at tablet " << TabletId); return false; } + TPortionInfo moved = portionInfo; - moved.SetGranule(granule); - if (!UpsertPortion(moved, apply, false)) { + moved.SetGranule(dstGranule); + if (!UpsertPortion(moved, apply, &oldInfo)) { LOG_S_ERROR("Cannot insert moved portion " << moved << " at tablet " << TabletId); return false; } @@ -973,8 +991,7 @@ void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& PathGranules[pathId].erase(mark); } -bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats, - const TPortionInfo* exInfo) { +bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, const TPortionInfo* exInfo) { ui64 granule = portionInfo.Granule(); if (!apply) { @@ -989,14 +1006,13 @@ bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool a Y_VERIFY(portionInfo.Valid()); auto& spg = Granules[granule]; Y_VERIFY(spg); - if (updateStats) { - if (exInfo) { - Y_VERIFY(portionInfo.Meta.Produced == TPortionMeta::EProduced::EVICTED); - UpdatePortionStats(portionInfo, EStatsUpdateType::DEFAULT, exInfo->Meta.Produced); - } else { - UpdatePortionStats(portionInfo); - } + + if (exInfo) { + UpdatePortionStats(portionInfo, EStatsUpdateType::DEFAULT, exInfo); + } else { + UpdatePortionStats(portionInfo, EStatsUpdateType::ADD); } + spg->UpsertPortion(portionInfo); return true; // It must return true if (apply == true) } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index fd7ce6dfef9..0fe8ecfc577 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -183,7 +183,7 @@ public: enum class EStatsUpdateType { DEFAULT = 0, ERASE, - LOAD + ADD, }; TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {}); @@ -330,13 +330,13 @@ private: /// Insert granule or check if same granule was already inserted. bool SetGranule(const TGranuleRecord& rec, bool apply); - bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true, const TPortionInfo* exInfo = nullptr); + bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, const TPortionInfo* exInfo = nullptr); bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true); void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT, - std::optional<TPortionMeta::EProduced> exProduced = {}); + const TPortionInfo* exPortionInfo = nullptr); void UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, EStatsUpdateType updateType, - std::optional<TPortionMeta::EProduced> exProduced = {}) const; + const TPortionInfo* exPortionInfo = nullptr) const; bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const; diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 0ffca6a199e..a2026ea8355 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -134,7 +134,7 @@ bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize, std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName, const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = testYdbSchema) { - UNIT_ASSERT(ts.size() > 1); + UNIT_ASSERT(ts.size() > 0); ui32 numRows = portionSize + (ts.size() - 1) * (portionSize - overlapSize); TString testData = MakeTestBlob({0, numRows}, ydbSchema); @@ -1160,6 +1160,79 @@ void TestDropWriteRace() { PlanCommit(runtime, sender, ++planStep, commitTxId); } +void TestCompaction(std::optional<ui32> numWrites = {}) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, + CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), + &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + // Create table + + ui64 metaShard = TTestTxConfig::TxTablet1; + ui64 writeId = 0; + ui64 tableId = 1; + ui64 planStep = 100; + ui64 txId = 100; + + bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk), + NOlap::TSnapshot(++planStep, ++txId)); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); + + // Set tiering + + ui64 ts = 1620000000; + TInstant now = TAppData::TimeProvider->Now(); + TDuration allow = TDuration::Seconds(now.Seconds() - ts + 3600); + TDuration disallow = TDuration::Seconds(now.Seconds() - ts - 3600); + + TTestSchema::TTableSpecials spec; + spec.SetTtlColumn("timestamp"); + spec.Tiers.emplace_back(TTestSchema::TStorageTier("hot").SetTtlColumn("timestamp")); + spec.Tiers.back().EvictAfter = disallow; + spec.Tiers.emplace_back(TTestSchema::TStorageTier("cold").SetTtlColumn("timestamp")); + spec.Tiers.back().EvictAfter = allow; + spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3(); + + ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 1, spec), + NOlap::TSnapshot(++planStep, ++txId)); + UNIT_ASSERT(ok); + PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId)); + + ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec)); + + // Writes + + std::vector<TString> blobs = MakeData({ts}, PORTION_ROWS, 0, spec.TtlColumn); + const TString& triggerData = blobs[0]; + UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); + UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize()); + + if (!numWrites) { + numWrites = 4 * NOlap::TCompactionLimits().GranuleExpectedSize / triggerData.size(); + } + + ++planStep; + ++txId; + for (ui32 i = 0; i < *numWrites; ++i, ++writeId, ++planStep, ++txId) { + UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData)); + + ProposeCommit(runtime, sender, metaShard, txId, {writeId}); + PlanCommit(runtime, sender, planStep, txId); + + if (i % 2 == 0) { + TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn); + } + } +} + } namespace NColumnShard { @@ -1414,6 +1487,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { // TODO: ReenableTierAfterExport // TODO: AlterTierBorderAfterExport + Y_UNIT_TEST(ColdCompactionSmoke) { + TestCompaction(); + } + Y_UNIT_TEST(Drop) { TestDrop(false); } |