aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-10-28 14:57:58 +0300
committerchertus <azuikov@ydb.tech>2022-10-28 14:57:58 +0300
commitb547fed100d54c9b2b94871edbb2c6fd502f96a5 (patch)
tree607b574a4a5e0cbecd3b8108f7f757a1bd78b7f6
parent549e5a86afc4eb32da1f188983b548ebf850d4bf (diff)
downloadydb-b547fed100d54c9b2b94871edbb2c6fd502f96a5.tar.gz
fix race between drop and propose on ColumnShard
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp16
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp22
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h1
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.cpp44
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.h3
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp2
7 files changed, 55 insertions, 41 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 28d396b613..6e69e03d0c 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -90,7 +90,13 @@ public:
Self->BatchCache.Commit(writeId);
}
- auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds);
+ auto pathExists = [&](ui64 pathId) {
+ auto it = Self->Tables.find(pathId);
+ return it != Self->Tables.end() && !it->second.IsDropped();
+ };
+
+ auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds,
+ pathExists);
Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows);
Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes);
Self->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes);
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 5ac459c136..1b4c54a64e 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -66,20 +66,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time);
ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
- auto writesToAbort = Self->InsertTable->OldWritesToAbort(time);
- std::vector<TWriteId> failedAborts;
- for (auto& writeId : writesToAbort) {
- if (!Self->RemoveLongTxWrite(db, writeId)) {
- failedAborts.push_back(writeId);
- }
- Self->BatchCache.EraseInserted(TWriteId(writeId));
- }
- for (auto& writeId : failedAborts) {
- writesToAbort.erase(writeId);
- }
- if (!writesToAbort.empty()) {
- Self->InsertTable->Abort(dbTable, {}, writesToAbort);
- }
+ THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time);
+ Self->TryAbortWrites(db, dbTable, std::move(writesToAbort));
// TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them.
// It's not optimal but correct.
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index b4c7df7c7d..4fb7540df8 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -336,6 +336,22 @@ bool TColumnShard::RemoveTx(NTable::TDatabase& database, ui64 txId) {
return true;
}
+void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) {
+ std::vector<TWriteId> failedAborts;
+ for (auto& writeId : writesToAbort) {
+ if (!RemoveLongTxWrite(db, writeId)) {
+ failedAborts.push_back(writeId);
+ }
+ BatchCache.EraseInserted(TWriteId(writeId));
+ }
+ for (auto& writeId : failedAborts) {
+ writesToAbort.erase(writeId);
+ }
+ if (!writesToAbort.empty()) {
+ InsertTable->Abort(dbTable, {}, writesToAbort);
+ }
+}
+
void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc) {
if (LastSchemaSeqNo < seqNo) {
LastSchemaSeqNo = seqNo;
@@ -519,10 +535,8 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
// TODO: Allow to read old snapshots after DROP
TBlobGroupSelector dsGroupSelector(Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- auto abortedWrites = InsertTable->DropPath(dbTable, pathId);
- for (auto& writeId : abortedWrites) {
- RemoveLongTxWrite(db, writeId);
- }
+ THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);
+ TryAbortWrites(db, dbTable, std::move(writesToAbort));
table->DropVersion = version;
Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 4faadbd09c..d0028870ea 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -435,6 +435,7 @@ private:
void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId);
bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0);
bool RemoveTx(NTable::TDatabase& database, ui64 txId);
+ void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort);
void EnqueueProgressTx(const TActorContext& ctx);
void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false);
diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp
index 6800bea30b..64f0737e9f 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table.cpp
@@ -22,7 +22,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
}
TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
- const THashSet<TWriteId>& writeIds) {
+ const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists) {
Y_VERIFY(!writeIds.empty());
Y_UNUSED(metaShard);
@@ -40,14 +40,22 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep,
dbTable.EraseInserted(*data);
- data->Commit(planStep, txId);
- dbTable.Commit(*data);
-
ui32 dataSize = data->BlobSize();
- if (CommittedByPathId[data->PathId].emplace(std::move(*data)).second) {
- ++StatsCommitted.Rows;
- StatsCommitted.Bytes += dataSize;
+
+ // There could be commit after drop: propose, drop, plan
+ if (pathExists(data->PathId)) {
+ data->Commit(planStep, txId);
+ dbTable.Commit(*data);
+
+ if (CommittedByPathId[data->PathId].emplace(std::move(*data)).second) {
+ ++StatsCommitted.Rows;
+ StatsCommitted.Bytes += dataSize;
+ }
+ } else {
+ dbTable.Abort(*data);
+ Aborted.emplace(writeId, std::move(*data));
}
+
if (Inserted.erase(writeId)) {
StatsPrepared.Rows = Inserted.size();
StatsPrepared.Bytes -= dataSize;
@@ -97,19 +105,6 @@ THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const {
}
THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) {
- // Abort not committed
-
- THashSet<TWriteId> toAbort;
- for (auto& [writeId, data] : Inserted) {
- if (data.PathId == pathId) {
- toAbort.insert(writeId);
- }
- }
-
- if (!toAbort.empty()) {
- Abort(dbTable, 0, toAbort);
- }
-
// Committed -> Aborted (for future cleanup)
TSet<TInsertedData> committed = std::move(CommittedByPathId[pathId]);
@@ -129,6 +124,15 @@ THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) {
Aborted.emplace(writeId, std::move(copy));
}
+ // Return not committed writes for abort. Tablet filter this list with proposed ones befor Abort().
+
+ THashSet<TWriteId> toAbort;
+ for (auto& [writeId, data] : Inserted) {
+ if (data.PathId == pathId) {
+ toAbort.insert(writeId);
+ }
+ }
+
return toAbort;
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h
index 1245204fbe..9a9f4fba6a 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table.h
@@ -113,7 +113,8 @@ public:
};
bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
- TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds);
+ TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard,
+ const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists);
void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds);
THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const;
THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId);
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index 8c3a5239e3..72faa6ae75 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
// commit
ui64 planStep = 100;
ui64 txId = 42;
- insertTable.Commit(dbTable, planStep, txId, metaShard, {TWriteId{writeId}});
+ insertTable.Commit(dbTable, planStep, txId, metaShard, {TWriteId{writeId}}, [](ui64){ return true; });
auto committed = insertTable.GetCommitted();
UNIT_ASSERT_EQUAL(committed.size(), 1);