aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-03-27 16:13:10 +0300
committerchertus <azuikov@ydb.tech>2023-03-27 16:13:10 +0300
commit3d54a0a2ffeb5d4e92fda71551f0867106255cd1 (patch)
treef1ed2db4fffcc433af49d231b2fc18dbfb87ea3d
parent5b79440688392ac5b53af0652dad5e95de3551f5 (diff)
downloadydb-3d54a0a2ffeb5d4e92fda71551f0867106255cd1.tar.gz
fix index lock in ColumnShard triggered by failes split-compations
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h1
4 files changed, 14 insertions, 5 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index b9da7837ea7..7cfbd1f3860 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -242,6 +242,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
}
+ Self->PrimaryIndex->FreeLocks(changes);
+
if (changes->IsInsert()) {
Self->ActiveIndexingOrCompaction = false;
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 4195fe6b619..bb0ef9d7c88 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -313,6 +313,7 @@ public:
virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
+ virtual void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) = 0;
virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
//virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO
virtual void UpdateCompactionLimits(const TCompactionLimits& limits) = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 716ea0ec9b4..2828cbb0788 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -985,11 +985,6 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
// Set x-snapshot to switched portions
if (changes->IsCompaction()) {
- Y_VERIFY(changes->SrcGranule);
-
- /// @warning set granule not in split even if tx would be aborted later
- GranulesInSplit.erase(changes->SrcGranule->Granule);
-
Y_VERIFY(changes->CompactionInfo);
for (auto& portionInfo : changes->SwitchedPortions) {
Y_VERIFY(portionInfo.IsActive());
@@ -1263,6 +1258,16 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, const TChanges& changes,
return true;
}
+void TColumnEngineForLogs::FreeLocks(std::shared_ptr<TColumnEngineChanges> indexChanges) {
+ auto changes = std::static_pointer_cast<TChanges>(indexChanges);
+
+ if (changes->IsCompaction()) {
+ // Set granule not in split. Do not block writes in it.
+ Y_VERIFY(changes->SrcGranule);
+ GranulesInSplit.erase(changes->SrcGranule->Granule);
+ }
+}
+
bool TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec, bool apply) {
TMark mark(rec.Mark);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 0b70368a7a4..292b4b2db2a 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -258,6 +258,7 @@ public:
ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) override;
+ void FreeLocks(std::shared_ptr<TColumnEngineChanges> changes) override;
void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override;
void UpdateCompactionLimits(const TCompactionLimits& limits) override { Limits = limits; }
const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override;