diff options
| author | Artem Zuikov <[email protected]> | 2022-05-26 21:09:13 +0300 |
|---|---|---|
| committer | Artem Zuikov <[email protected]> | 2022-05-26 21:09:13 +0300 |
| commit | 881c69daa45d4dcf9aee50e76150291efe55d9dc (patch) | |
| tree | 927cc252dc4a18b7f4cd4ce52b31f48939636313 | |
| parent | a983ee268a191f2757454e98e94d36615ee1339b (diff) | |
KIKIMR-14990: do not clean writes for proposed txs in ColumnShard (even after 24h)
ref:97add85e647719ddda0ed362806527c077866423
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 17 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 2 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table.h | 4 |
5 files changed, 20 insertions, 13 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index af19dcca408..51e0a0508fa 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -64,12 +64,21 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); ok = Self->InsertTable->Insert(dbTable, NOlap::TInsertedData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time)); if (ok) { - auto newAborted = Self->InsertTable->AbortOld(dbTable, time); - for (auto& writeId : newAborted) { - Self->RemoveLongTxWrite(db, writeId); + auto writesToAbort = Self->InsertTable->OldWritesToAbort(time); + std::vector<TWriteId> failedAborts; + for (auto& writeId : writesToAbort) { + if (!Self->RemoveLongTxWrite(db, writeId)) { + failedAborts.push_back(writeId); + } + } + for (auto& writeId : failedAborts) { + writesToAbort.erase(writeId); + } + if (!writesToAbort.empty()) { + Self->InsertTable->Abort(dbTable, {}, writesToAbort); } - // TODO: It leads to write+erase for new aborted rows. AbortOld() inserts rows, EraseAborted() erases them. + // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them. // It's not optimal but correct. TBlobManagerDb blobManagerDb(txc.DB); auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 8d5171e121c..6a89c130946 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -221,15 +221,17 @@ void TColumnShard::LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLong LongTxWritesByUniqueId[longTxId.UniqueId] = &lw; } -void TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId) { +bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId) { if (auto* lw = LongTxWrites.FindPtr(writeId)) { ui64 prepared = lw->PreparedTxId; if (!prepared || txId == prepared) { Schema::EraseLongTxWrite(db, writeId); LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId); LongTxWrites.erase(writeId); + return true; } } + return false; } bool TColumnShard::RemoveTx(NTable::TDatabase& database, ui64 txId) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 42c261080f0..4bbb2dc6426 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -404,7 +404,7 @@ private: TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId); void AddLongTxWrite(TWriteId writeId, ui64 txId); void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId); - void RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); + bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); bool RemoveTx(NTable::TDatabase& database, ui64 txId); void EnqueueProgressTx(); diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp index 6b205df3949..6800bea30b2 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table.cpp @@ -77,7 +77,7 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr } } -THashSet<TWriteId> TInsertTable::AbortOld(IDbWrapper& dbTable, const TInstant& now) { +THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const { // TODO: This protection does not save us from real flooder activity. // This cleanup is for seldom aborts caused by rare reasons. So there's a temporary simple O(N) here // keeping in mind we need a smarter cleanup logic here not a better algo. @@ -93,10 +93,6 @@ THashSet<TWriteId> TInsertTable::AbortOld(IDbWrapper& dbTable, const TInstant& n toAbort.insert(writeId); } } - - if (!toAbort.empty()) { - Abort(dbTable, 0, toAbort); - } return toAbort; } diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h index 64922c289f7..11ddaa4c491 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table.h @@ -115,7 +115,7 @@ public: bool Insert(IDbWrapper& dbTable, TInsertedData&& data); TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds); void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds); - THashSet<TWriteId> AbortOld(IDbWrapper& dbTable, const TInstant& now); + THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const; THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId); void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key); void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key); @@ -135,7 +135,7 @@ private: THashMap<ui64, TSet<TInsertedData>> CommittedByPathId; THashMap<TWriteId, TInsertedData> Aborted; THashSet<ui64> PathsOverloaded; - TInstant LastCleanup; + mutable TInstant LastCleanup; TCounters StatsPrepared; TCounters StatsCommitted; }; |
