aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-06-27 21:04:05 +0300
committerchertus <azuikov@ydb.tech>2023-06-27 21:04:05 +0300
commit6bbbfe451fa07002cef3192f7ccf843d1faa80eb (patch)
treee5d9446e6edafa28cabf09c094399122d8456b44
parentb2a20d29e2f47fced13790aff60f9a7dbea8a353 (diff)
downloadydb-6bbbfe451fa07002cef3192f7ccf843d1faa80eb.tar.gz
fix ColumnShard counters in case of rase between export and compaction
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp82
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h8
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp79
3 files changed, 131 insertions, 38 deletions
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index bf4bfc44f37..81baad94632 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -220,8 +220,8 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() {
}
void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType,
- std::optional<TPortionMeta::EProduced> exProduced) {
- UpdatePortionStats(Counters, portionInfo, updateType, exProduced);
+ const TPortionInfo* exPortionInfo) {
+ UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo);
ui64 granule = portionInfo.Granule();
Y_VERIFY(granule);
@@ -233,12 +233,12 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, E
stats = std::make_shared<TColumnEngineStats>();
stats->Tables = 1;
}
- UpdatePortionStats(*PathStats[pathId], portionInfo, updateType, exProduced);
+ UpdatePortionStats(*PathStats[pathId], portionInfo, updateType, exPortionInfo);
}
void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
EStatsUpdateType updateType,
- std::optional<TPortionMeta::EProduced> exProduced) const {
+ const TPortionInfo* exPortionInfo) const {
TColumnEngineStats::TPortionsStats deltaStats;
ui64 columnRecords = portionInfo.Records.size();
ui64 metadataBytes = 0;
@@ -259,31 +259,35 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
}
deltaStats.Blobs = blobs.size();
deltaStats.Portions = 1;
+
+ Y_VERIFY(!exPortionInfo || exPortionInfo->Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED);
Y_VERIFY(portionInfo.Meta.Produced != TPortionMeta::EProduced::UNSPECIFIED);
- TColumnEngineStats::TPortionsStats& srcStats = exProduced
- ? engineStats.StatsByType[*exProduced]
+
+ TColumnEngineStats::TPortionsStats& srcStats = exPortionInfo
+ ? (exPortionInfo->IsActive()
+ ? engineStats.StatsByType[exPortionInfo->Meta.Produced]
+ : engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE])
: engineStats.StatsByType[portionInfo.Meta.Produced];
- auto* stats = portionInfo.IsActive()
- ? &engineStats.StatsByType[portionInfo.Meta.Produced]
- : &engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE];
+ TColumnEngineStats::TPortionsStats& stats = portionInfo.IsActive()
+ ? engineStats.StatsByType[portionInfo.Meta.Produced]
+ : engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE];
const bool isErase = updateType == EStatsUpdateType::ERASE;
- const bool isLoad = updateType == EStatsUpdateType::LOAD;
- const bool isAppended = portionInfo.IsActive() && !exProduced;
+ const bool isAdd = updateType == EStatsUpdateType::ADD;
if (isErase) { // PortionsToDrop
engineStats.ColumnRecords -= columnRecords;
engineStats.ColumnMetadataBytes -= metadataBytes;
- *stats -= deltaStats;
- } else if (isLoad || isAppended) { // AppendedPortions
+ stats -= deltaStats;
+ } else if (isAdd) { // Load || AppendedPortions
engineStats.ColumnRecords += columnRecords;
engineStats.ColumnMetadataBytes += metadataBytes;
- *stats += deltaStats;
- } else { // SwitchedPortions || PortionsToEvict
+ stats += deltaStats;
+ } else if (&srcStats != &stats) { // SwitchedPortions || PortionsToEvict
srcStats -= deltaStats;
- (*stats) += deltaStats;
+ stats += deltaStats;
}
}
@@ -319,7 +323,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
emptyGranulePaths.insert(spg->PathId());
}
for (const auto& [_, portionInfo] : spg->GetPortions()) {
- UpdatePortionStats(portionInfo, EStatsUpdateType::LOAD);
+ UpdatePortionStats(portionInfo, EStatsUpdateType::ADD);
if (portionInfo.CheckForCleanup()) {
CleanupPortions.emplace(portionInfo.GetAddress());
}
@@ -766,7 +770,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
ui64 granule = portionInfo.Granule();
ui64 portion = portionInfo.Portion();
if (!Granules.contains(granule)) {
- LOG_S_DEBUG("Cannot update unknown granule " << granule << " at tablet " << TabletId);
+ LOG_S_ERROR("Cannot update unknown granule " << granule << " at tablet " << TabletId);
return false;
}
if (!Granules[granule]->GetPortions().contains(portion)) {
@@ -786,7 +790,10 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
}
- if (!UpsertPortion(portionInfo, apply)) {
+ // In case of race with eviction portion could become evicted
+ const TPortionInfo& oldInfo = Granules[granule]->GetPortionVerified(portion);
+
+ if (!UpsertPortion(portionInfo, apply, &oldInfo)) {
LOG_S_ERROR("Cannot update portion " << portionInfo << " at tablet " << TabletId);
return false;
}
@@ -815,7 +822,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
// In case of race with compaction portion could become inactive
- // TODO: evict others instead of abort eviction
const TPortionInfo& oldInfo = Granules[granule]->GetPortionVerified(portion);
if (!oldInfo.IsActive()) {
LOG_S_WARN("Cannot evict inactive portion " << oldInfo << " at tablet " << TabletId);
@@ -823,7 +829,7 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
}
Y_VERIFY(portionInfo.TierName != oldInfo.TierName);
- if (!UpsertPortion(portionInfo, apply, true, &oldInfo)) {
+ if (!UpsertPortion(portionInfo, apply, &oldInfo)) {
LOG_S_ERROR("Cannot evict portion " << portionInfo << " at tablet " << TabletId);
return false;
}
@@ -837,15 +843,27 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
// Move portions in granules (zero-copy switch + append into new granules)
- for (auto& [info, granule] : changes.PortionsToMove) {
+ for (auto& [info, dstGranule] : changes.PortionsToMove) {
const auto& portionInfo = info;
+
+ ui64 granule = portionInfo.Granule();
+ ui64 portion = portionInfo.Portion();
+ if (!Granules.contains(granule) || !Granules[granule]->GetPortions().contains(portion)) {
+ LOG_S_ERROR("Cannot move unknown portion " << portionInfo << " at tablet " << TabletId);
+ return false;
+ }
+
+ // In case of race with eviction portion could become evicted
+ const TPortionInfo oldInfo = Granules[granule]->GetPortionVerified(portion);
+
if (!ErasePortion(portionInfo, apply, false)) {
LOG_S_ERROR("Cannot erase moved portion " << portionInfo << " at tablet " << TabletId);
return false;
}
+
TPortionInfo moved = portionInfo;
- moved.SetGranule(granule);
- if (!UpsertPortion(moved, apply, false)) {
+ moved.SetGranule(dstGranule);
+ if (!UpsertPortion(moved, apply, &oldInfo)) {
LOG_S_ERROR("Cannot insert moved portion " << moved << " at tablet " << TabletId);
return false;
}
@@ -973,8 +991,7 @@ void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark&
PathGranules[pathId].erase(mark);
}
-bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats,
- const TPortionInfo* exInfo) {
+bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool apply, const TPortionInfo* exInfo) {
ui64 granule = portionInfo.Granule();
if (!apply) {
@@ -989,14 +1006,13 @@ bool TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, bool a
Y_VERIFY(portionInfo.Valid());
auto& spg = Granules[granule];
Y_VERIFY(spg);
- if (updateStats) {
- if (exInfo) {
- Y_VERIFY(portionInfo.Meta.Produced == TPortionMeta::EProduced::EVICTED);
- UpdatePortionStats(portionInfo, EStatsUpdateType::DEFAULT, exInfo->Meta.Produced);
- } else {
- UpdatePortionStats(portionInfo);
- }
+
+ if (exInfo) {
+ UpdatePortionStats(portionInfo, EStatsUpdateType::DEFAULT, exInfo);
+ } else {
+ UpdatePortionStats(portionInfo, EStatsUpdateType::ADD);
}
+
spg->UpsertPortion(portionInfo);
return true; // It must return true if (apply == true)
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index fd7ce6dfef9..0fe8ecfc577 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -183,7 +183,7 @@ public:
enum class EStatsUpdateType {
DEFAULT = 0,
ERASE,
- LOAD
+ ADD,
};
TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits = {});
@@ -330,13 +330,13 @@ private:
/// Insert granule or check if same granule was already inserted.
bool SetGranule(const TGranuleRecord& rec, bool apply);
- bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true, const TPortionInfo* exInfo = nullptr);
+ bool UpsertPortion(const TPortionInfo& portionInfo, bool apply, const TPortionInfo* exInfo = nullptr);
bool ErasePortion(const TPortionInfo& portionInfo, bool apply, bool updateStats = true);
void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT,
- std::optional<TPortionMeta::EProduced> exProduced = {});
+ const TPortionInfo* exPortionInfo = nullptr);
void UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
EStatsUpdateType updateType,
- std::optional<TPortionMeta::EProduced> exProduced = {}) const;
+ const TPortionInfo* exPortionInfo = nullptr) const;
bool CanInsert(const TChanges& changes, const TSnapshot& commitSnap) const;
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 0ffca6a199e..a2026ea8355 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -134,7 +134,7 @@ bool CheckSame(const TString& blob, const TString& strSchema, ui32 expectedSize,
std::vector<TString> MakeData(const std::vector<ui64>& ts, ui32 portionSize, ui32 overlapSize, const TString& ttlColumnName,
const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema = testYdbSchema) {
- UNIT_ASSERT(ts.size() > 1);
+ UNIT_ASSERT(ts.size() > 0);
ui32 numRows = portionSize + (ts.size() - 1) * (portionSize - overlapSize);
TString testData = MakeTestBlob({0, numRows}, ydbSchema);
@@ -1160,6 +1160,79 @@ void TestDropWriteRace() {
PlanCommit(runtime, sender, ++planStep, commitTxId);
}
+void TestCompaction(std::optional<ui32> numWrites = {}) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime,
+ CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard),
+ &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ // Create table
+
+ ui64 metaShard = TTestTxConfig::TxTablet1;
+ ui64 writeId = 0;
+ ui64 tableId = 1;
+ ui64 planStep = 100;
+ ui64 txId = 100;
+
+ bool ok = ProposeSchemaTx(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
+ NOlap::TSnapshot(++planStep, ++txId));
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
+
+ // Set tiering
+
+ ui64 ts = 1620000000;
+ TInstant now = TAppData::TimeProvider->Now();
+ TDuration allow = TDuration::Seconds(now.Seconds() - ts + 3600);
+ TDuration disallow = TDuration::Seconds(now.Seconds() - ts - 3600);
+
+ TTestSchema::TTableSpecials spec;
+ spec.SetTtlColumn("timestamp");
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("hot").SetTtlColumn("timestamp"));
+ spec.Tiers.back().EvictAfter = disallow;
+ spec.Tiers.emplace_back(TTestSchema::TStorageTier("cold").SetTtlColumn("timestamp"));
+ spec.Tiers.back().EvictAfter = allow;
+ spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3();
+
+ ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 1, spec),
+ NOlap::TSnapshot(++planStep, ++txId));
+ UNIT_ASSERT(ok);
+ PlanSchemaTx(runtime, sender, NOlap::TSnapshot(planStep, txId));
+
+ ProvideTieringSnapshot(runtime, sender, TTestSchema::BuildSnapshot(spec));
+
+ // Writes
+
+ std::vector<TString> blobs = MakeData({ts}, PORTION_ROWS, 0, spec.TtlColumn);
+ const TString& triggerData = blobs[0];
+ UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
+ UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize());
+
+ if (!numWrites) {
+ numWrites = 4 * NOlap::TCompactionLimits().GranuleExpectedSize / triggerData.size();
+ }
+
+ ++planStep;
+ ++txId;
+ for (ui32 i = 0; i < *numWrites; ++i, ++writeId, ++planStep, ++txId) {
+ UNIT_ASSERT(WriteData(runtime, sender, metaShard, writeId, tableId, triggerData));
+
+ ProposeCommit(runtime, sender, metaShard, txId, {writeId});
+ PlanCommit(runtime, sender, planStep, txId);
+
+ if (i % 2 == 0) {
+ TriggerTTL(runtime, sender, NOlap::TSnapshot(++planStep, ++txId), {}, 0, spec.TtlColumn);
+ }
+ }
+}
+
}
namespace NColumnShard {
@@ -1414,6 +1487,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
// TODO: ReenableTierAfterExport
// TODO: AlterTierBorderAfterExport
+ Y_UNIT_TEST(ColdCompactionSmoke) {
+ TestCompaction();
+ }
+
Y_UNIT_TEST(Drop) {
TestDrop(false);
}