aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-12-25 16:13:38 +0300
committerGitHub <noreply@github.com>2024-12-25 16:13:38 +0300
commit46419b617046ab0b7d80aa8ace358bfc8a71e810 (patch)
tree8f64c321ea88c278a9f696bd5aaeb274812786d6
parentb46a3cc13f7b807cc7c8b8e43ae50b0617d8b2fe (diff)
downloadydb-46419b617046ab0b7d80aa8ace358bfc8a71e810.tar.gz
table have to be readable with snapshot livetime (#12964)
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp3
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp3
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp15
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp8
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp39
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h36
6 files changed, 64 insertions, 40 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp
index b365f542df..7d38657d83 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp
@@ -15,9 +15,10 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
auto& index = Self->MutableIndexAs<NOlap::TColumnEngineForLogs>();
+ const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& pack : Packs) {
const auto& writeMeta = pack.GetWriteMeta();
- AFL_VERIFY(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
+ AFL_VERIFY(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
AFL_VERIFY(!writeMeta.HasLongTxId());
auto operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index 8bbcfce0e0..52d3bed0a6 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -40,9 +40,10 @@ bool TTxWrite::DoExecute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_BLOBS)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
+ const auto minReadSnapshot = Self->GetMinReadSnapshot();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteMeta();
- Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
+ Y_ABORT_UNLESS(Self->TablesManager.IsReadyForFinishWrite(writeMeta.GetTableId(), minReadSnapshot));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 68a83bf1a1..a3c81d2083 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -142,17 +142,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
"writing_id", writeMeta.GetId())("status", putResult.GetPutStatus());
Counters.GetWritesMonitor()->OnFinishWrite(aggr->GetSize(), 1);
- if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) {
- ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex());
- Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
-
- auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
- ctx.Send(writeMeta.GetSource(), result.release());
- Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable);
- wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
- continue;
- }
-
if (putResult.GetPutStatus() != NKikimrProto::OK) {
Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
@@ -238,7 +227,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);
}
- if (!TablesManager.IsReadyForWrite(pathId)) {
+ if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index")
<< " at tablet " << TabletID());
@@ -558,7 +547,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto pathId = operation.GetTableId().GetTableId();
- if (!TablesManager.IsReadyForWrite(pathId)) {
+ if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
sendError("table not writable", NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR);
return;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index f68cc93dd4..d9017da74e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -1051,8 +1051,10 @@ void TColumnShard::SetupCleanupPortions() {
return;
}
- auto changes =
- TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
+ const NOlap::TSnapshot minReadSnapshot = GetMinReadSnapshot();
+ THashSet<ui64> pathsToDrop = TablesManager.GetPathsToDrop(minReadSnapshot);
+
+ auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(minReadSnapshot, pathsToDrop, DataLocksManager);
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
@@ -1077,7 +1079,7 @@ void TColumnShard::SetupCleanupTables() {
}
THashSet<ui64> pathIdsEmptyInInsertTable;
- for (auto&& i : TablesManager.GetPathsToDrop()) {
+ for (auto&& i : TablesManager.GetPathsToDrop(GetMinReadSnapshot())) {
if (InsertTable->HasPathIdData(i)) {
continue;
}
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index 63a74e12fb..cd6eda199c 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -60,7 +60,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return false;
}
if (table.IsDropped()) {
- PathsToDrop.insert(table.GetPathId());
+ AFL_VERIFY(PathsToDrop[table.GetDropVersionVerified()].emplace(table.GetPathId()).second);
}
AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second);
@@ -201,19 +201,23 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return true;
}
-bool TTablesManager::HasTable(const ui64 pathId, bool withDeleted) const {
+bool TTablesManager::HasTable(const ui64 pathId, const bool withDeleted, const std::optional<NOlap::TSnapshot> minReadSnapshot) const {
auto it = Tables.find(pathId);
if (it == Tables.end()) {
return false;
}
- if (it->second.IsDropped()) {
+ if (it->second.IsDropped(minReadSnapshot)) {
return withDeleted;
}
return true;
}
-bool TTablesManager::IsReadyForWrite(const ui64 pathId) const {
- return HasPrimaryIndex() && HasTable(pathId);
+bool TTablesManager::IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const {
+ return HasPrimaryIndex() && HasTable(pathId, withDeleted);
+}
+
+bool TTablesManager::IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const {
+ return HasPrimaryIndex() && HasTable(pathId, false, minReadSnapshot);
}
bool TTablesManager::HasPreset(const ui32 presetId) const {
@@ -237,7 +241,7 @@ void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& versio
AFL_VERIFY(Tables.contains(pathId));
auto& table = Tables[pathId];
table.SetDropVersion(version);
- PathsToDrop.insert(pathId);
+ AFL_VERIFY(PathsToDrop[version].emplace(pathId).second);
Ttl.erase(pathId);
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
}
@@ -363,13 +367,15 @@ TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& s
}
bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const {
- auto itDrop = PathsToDrop.find(pathId);
+ const auto& itTable = Tables.find(pathId);
+ AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
+ auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
AFL_VERIFY(itDrop != PathsToDrop.end());
+ AFL_VERIFY(itDrop->second.contains(pathId));
+
AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
NIceDb::TNiceDb db(dbTable);
NColumnShard::Schema::EraseTableInfo(db, pathId);
- const auto& itTable = Tables.find(pathId);
- AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
for (auto&& tableVersion : itTable->second.GetVersions()) {
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion);
}
@@ -377,13 +383,18 @@ bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, co
}
bool TTablesManager::TryFinalizeDropPathOnComplete(const ui64 pathId) {
- auto itDrop = PathsToDrop.find(pathId);
- AFL_VERIFY(itDrop != PathsToDrop.end());
- AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
- AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
- PathsToDrop.erase(itDrop);
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
+ {
+ auto itDrop = PathsToDrop.find(itTable->second.GetDropVersionVerified());
+ AFL_VERIFY(itDrop != PathsToDrop.end());
+ AFL_VERIFY(itDrop->second.erase(pathId));
+ if (itDrop->second.empty()) {
+ PathsToDrop.erase(itDrop);
+ }
+ }
+ AFL_VERIFY(!GetPrimaryIndexSafe().HasDataInPathId(pathId));
+ AFL_VERIFY(MutablePrimaryIndex().ErasePathId(pathId));
Tables.erase(itTable);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("method", "TryFinalizeDropPathOnComplete")("path_id", pathId)("size", Tables.size());
return true;
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 2f8d418328..f3fc89b258 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -105,7 +105,13 @@ public:
return PathId;
}
+ const NOlap::TSnapshot& GetDropVersionVerified() const {
+ AFL_VERIFY(DropVersion);
+ return *DropVersion;
+ }
+
void SetDropVersion(const NOlap::TSnapshot& version) {
+ AFL_VERIFY(!DropVersion)("exists", DropVersion->DebugString())("version", version.DebugString());
DropVersion = version;
}
@@ -113,8 +119,14 @@ public:
Versions.insert(snapshot);
}
- bool IsDropped() const {
- return DropVersion.has_value();
+ bool IsDropped(const std::optional<NOlap::TSnapshot>& minReadSnapshot = std::nullopt) const {
+ if (!DropVersion) {
+ return false;
+ }
+ if (!minReadSnapshot) {
+ return true;
+ }
+ return *DropVersion < *minReadSnapshot;
}
TTableInfo() = default;
@@ -139,7 +151,7 @@ private:
THashMap<ui64, TTableInfo> Tables;
THashSet<ui32> SchemaPresetsIds;
THashMap<ui32, NKikimrSchemeOp::TColumnTableSchema> ActualSchemaForPreset;
- THashSet<ui64> PathsToDrop;
+ std::map<NOlap::TSnapshot, THashSet<ui64>> PathsToDrop;
THashMap<ui64, NOlap::TTiering> Ttl;
std::unique_ptr<NOlap::IColumnEngine> PrimaryIndex;
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
@@ -166,12 +178,19 @@ public:
return Ttl;
}
- const THashSet<ui64>& GetPathsToDrop() const {
+ const std::map<NOlap::TSnapshot, THashSet<ui64>>& GetPathsToDrop() const {
return PathsToDrop;
}
- THashSet<ui64>& MutablePathsToDrop() {
- return PathsToDrop;
+ THashSet<ui64> GetPathsToDrop(const NOlap::TSnapshot& minReadSnapshot) const {
+ THashSet<ui64> result;
+ for (auto&& i : PathsToDrop) {
+ if (minReadSnapshot < i.first) {
+ break;
+ }
+ result.insert(i.second.begin(), i.second.end());
+ }
+ return result;
}
const THashMap<ui64, TTableInfo>& GetTables() const {
@@ -236,8 +255,9 @@ public:
const TTableInfo& GetTable(const ui64 pathId) const;
ui64 GetMemoryUsage() const;
- bool HasTable(const ui64 pathId, bool withDeleted = false) const;
- bool IsReadyForWrite(const ui64 pathId) const;
+ bool HasTable(const ui64 pathId, const bool withDeleted = false, const std::optional<NOlap::TSnapshot> minReadSnapshot = std::nullopt) const;
+ bool IsReadyForStartWrite(const ui64 pathId, const bool withDeleted) const;
+ bool IsReadyForFinishWrite(const ui64 pathId, const NOlap::TSnapshot& minReadSnapshot) const;
bool HasPreset(const ui32 presetId) const;
void DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db);