summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <[email protected]>2022-05-26 21:09:13 +0300
committerArtem Zuikov <[email protected]>2022-05-26 21:09:13 +0300
commit881c69daa45d4dcf9aee50e76150291efe55d9dc (patch)
tree927cc252dc4a18b7f4cd4ce52b31f48939636313
parenta983ee268a191f2757454e98e94d36615ee1339b (diff)
KIKIMR-14990: do not clean writes for proposed txs in ColumnShard (even after 24h)
ref:97add85e647719ddda0ed362806527c077866423
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp17
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.h4
5 files changed, 20 insertions, 13 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index af19dcca408..51e0a0508fa 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -64,12 +64,21 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
ok = Self->InsertTable->Insert(dbTable, NOlap::TInsertedData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time));
if (ok) {
- auto newAborted = Self->InsertTable->AbortOld(dbTable, time);
- for (auto& writeId : newAborted) {
- Self->RemoveLongTxWrite(db, writeId);
+ auto writesToAbort = Self->InsertTable->OldWritesToAbort(time);
+ std::vector<TWriteId> failedAborts;
+ for (auto& writeId : writesToAbort) {
+ if (!Self->RemoveLongTxWrite(db, writeId)) {
+ failedAborts.push_back(writeId);
+ }
+ }
+ for (auto& writeId : failedAborts) {
+ writesToAbort.erase(writeId);
+ }
+ if (!writesToAbort.empty()) {
+ Self->InsertTable->Abort(dbTable, {}, writesToAbort);
}
- // TODO: It leads to write+erase for new aborted rows. AbortOld() inserts rows, EraseAborted() erases them.
+ // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them.
// It's not optimal but correct.
TBlobManagerDb blobManagerDb(txc.DB);
auto allAborted = Self->InsertTable->GetAborted(); // copy (src is modified in cycle)
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 8d5171e121c..6a89c130946 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -221,15 +221,17 @@ void TColumnShard::LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLong
LongTxWritesByUniqueId[longTxId.UniqueId] = &lw;
}
-void TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId) {
+bool TColumnShard::RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId) {
if (auto* lw = LongTxWrites.FindPtr(writeId)) {
ui64 prepared = lw->PreparedTxId;
if (!prepared || txId == prepared) {
Schema::EraseLongTxWrite(db, writeId);
LongTxWritesByUniqueId.erase(lw->LongTxId.UniqueId);
LongTxWrites.erase(writeId);
+ return true;
}
}
+ return false;
}
bool TColumnShard::RemoveTx(NTable::TDatabase& database, ui64 txId) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 42c261080f0..4bbb2dc6426 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -404,7 +404,7 @@ private:
TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId);
void AddLongTxWrite(TWriteId writeId, ui64 txId);
void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId);
- void RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0);
+ bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0);
bool RemoveTx(NTable::TDatabase& database, ui64 txId);
void EnqueueProgressTx();
diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp
index 6b205df3949..6800bea30b2 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table.cpp
@@ -77,7 +77,7 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr
}
}
-THashSet<TWriteId> TInsertTable::AbortOld(IDbWrapper& dbTable, const TInstant& now) {
+THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const {
// TODO: This protection does not save us from real flooder activity.
// This cleanup is for seldom aborts caused by rare reasons. So there's a temporary simple O(N) here
// keeping in mind we need a smarter cleanup logic here not a better algo.
@@ -93,10 +93,6 @@ THashSet<TWriteId> TInsertTable::AbortOld(IDbWrapper& dbTable, const TInstant& n
toAbort.insert(writeId);
}
}
-
- if (!toAbort.empty()) {
- Abort(dbTable, 0, toAbort);
- }
return toAbort;
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h
index 64922c289f7..11ddaa4c491 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table.h
@@ -115,7 +115,7 @@ public:
bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds);
void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds);
- THashSet<TWriteId> AbortOld(IDbWrapper& dbTable, const TInstant& now);
+ THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
void EraseCommitted(IDbWrapper& dbTable, const TInsertedData& key);
void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key);
@@ -135,7 +135,7 @@ private:
THashMap<ui64, TSet<TInsertedData>> CommittedByPathId;
THashMap<TWriteId, TInsertedData> Aborted;
THashSet<ui64> PathsOverloaded;
- TInstant LastCleanup;
+ mutable TInstant LastCleanup;
TCounters StatsPrepared;
TCounters StatsCommitted;
};