diff options
author | chertus <azuikov@ydb.tech> | 2022-10-28 14:57:58 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-10-28 14:57:58 +0300 |
commit | b547fed100d54c9b2b94871edbb2c6fd502f96a5 (patch) | |
tree | 607b574a4a5e0cbecd3b8108f7f757a1bd78b7f6 | |
parent | 549e5a86afc4eb32da1f188983b548ebf850d4bf (diff) | |
download | ydb-b547fed100d54c9b2b94871edbb2c6fd502f96a5.tar.gz |
fix race between drop and propose on ColumnShard
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 16 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 22 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table.cpp | 44 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_insert_table.cpp | 2 |
7 files changed, 55 insertions, 41 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 28d396b613..6e69e03d0c 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -90,7 +90,13 @@ public: Self->BatchCache.Commit(writeId); } - auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds); + auto pathExists = [&](ui64 pathId) { + auto it = Self->Tables.find(pathId); + return it != Self->Tables.end() && !it->second.IsDropped(); + }; + + auto counters = Self->InsertTable->Commit(dbTable, step, txId, meta.MetaShard, meta.WriteIds, + pathExists); Self->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); Self->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); Self->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 5ac459c136..1b4c54a64e 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -66,20 +66,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { NOlap::TInsertedData insertData(metaShard, writeId, tableId, dedupId, logoBlobId, metaStr, time); ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { - auto writesToAbort = Self->InsertTable->OldWritesToAbort(time); - std::vector<TWriteId> failedAborts; - for (auto& writeId : writesToAbort) { - if (!Self->RemoveLongTxWrite(db, writeId)) { - failedAborts.push_back(writeId); - } - Self->BatchCache.EraseInserted(TWriteId(writeId)); - } - for (auto& writeId : failedAborts) { - writesToAbort.erase(writeId); - } - if (!writesToAbort.empty()) { - Self->InsertTable->Abort(dbTable, {}, writesToAbort); - } + THashSet<TWriteId> writesToAbort = Self->InsertTable->OldWritesToAbort(time); + Self->TryAbortWrites(db, dbTable, std::move(writesToAbort)); // TODO: It leads to write+erase for aborted rows. Abort() inserts rows, EraseAborted() erases them. // It's not optimal but correct. diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index b4c7df7c7d..4fb7540df8 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -336,6 +336,22 @@ bool TColumnShard::RemoveTx(NTable::TDatabase& database, ui64 txId) { return true; } +void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort) { + std::vector<TWriteId> failedAborts; + for (auto& writeId : writesToAbort) { + if (!RemoveLongTxWrite(db, writeId)) { + failedAborts.push_back(writeId); + } + BatchCache.EraseInserted(TWriteId(writeId)); + } + for (auto& writeId : failedAborts) { + writesToAbort.erase(writeId); + } + if (!writesToAbort.empty()) { + InsertTable->Abort(dbTable, {}, writesToAbort); + } +} + void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc) { if (LastSchemaSeqNo < seqNo) { LastSchemaSeqNo = seqNo; @@ -519,10 +535,8 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt // TODO: Allow to read old snapshots after DROP TBlobGroupSelector dsGroupSelector(Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - auto abortedWrites = InsertTable->DropPath(dbTable, pathId); - for (auto& writeId : abortedWrites) { - RemoveLongTxWrite(db, writeId); - } + THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId); + TryAbortWrites(db, dbTable, std::move(writesToAbort)); table->DropVersion = version; Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 4faadbd09c..d0028870ea 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -435,6 +435,7 @@ private: void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId); bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); bool RemoveTx(NTable::TDatabase& database, ui64 txId); + void TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTable, THashSet<TWriteId>&& writesToAbort); void EnqueueProgressTx(const TActorContext& ctx); void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false); diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp index 6800bea30b..64f0737e9f 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table.cpp @@ -22,7 +22,7 @@ bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { } TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, - const THashSet<TWriteId>& writeIds) { + const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists) { Y_VERIFY(!writeIds.empty()); Y_UNUSED(metaShard); @@ -40,14 +40,22 @@ TInsertTable::TCounters TInsertTable::Commit(IDbWrapper& dbTable, ui64 planStep, dbTable.EraseInserted(*data); - data->Commit(planStep, txId); - dbTable.Commit(*data); - ui32 dataSize = data->BlobSize(); - if (CommittedByPathId[data->PathId].emplace(std::move(*data)).second) { - ++StatsCommitted.Rows; - StatsCommitted.Bytes += dataSize; + + // There could be commit after drop: propose, drop, plan + if (pathExists(data->PathId)) { + data->Commit(planStep, txId); + dbTable.Commit(*data); + + if (CommittedByPathId[data->PathId].emplace(std::move(*data)).second) { + ++StatsCommitted.Rows; + StatsCommitted.Bytes += dataSize; + } + } else { + dbTable.Abort(*data); + Aborted.emplace(writeId, std::move(*data)); } + if (Inserted.erase(writeId)) { StatsPrepared.Rows = Inserted.size(); StatsPrepared.Bytes -= dataSize; @@ -97,19 +105,6 @@ THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const { } THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { - // Abort not committed - - THashSet<TWriteId> toAbort; - for (auto& [writeId, data] : Inserted) { - if (data.PathId == pathId) { - toAbort.insert(writeId); - } - } - - if (!toAbort.empty()) { - Abort(dbTable, 0, toAbort); - } - // Committed -> Aborted (for future cleanup) TSet<TInsertedData> committed = std::move(CommittedByPathId[pathId]); @@ -129,6 +124,15 @@ THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) { Aborted.emplace(writeId, std::move(copy)); } + // Return not committed writes for abort. Tablet filter this list with proposed ones befor Abort(). + + THashSet<TWriteId> toAbort; + for (auto& [writeId, data] : Inserted) { + if (data.PathId == pathId) { + toAbort.insert(writeId); + } + } + return toAbort; } diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h index 1245204fbe..9a9f4fba6a 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table.h @@ -113,7 +113,8 @@ public: }; bool Insert(IDbWrapper& dbTable, TInsertedData&& data); - TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds); + TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, + const THashSet<TWriteId>& writeIds, std::function<bool(ui64)> pathExists); void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds); THashSet<TWriteId> OldWritesToAbort(const TInstant& now) const; THashSet<TWriteId> DropPath(IDbWrapper& dbTable, ui64 pathId); diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index 8c3a5239e3..72faa6ae75 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -75,7 +75,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { // commit ui64 planStep = 100; ui64 txId = 42; - insertTable.Commit(dbTable, planStep, txId, metaShard, {TWriteId{writeId}}); + insertTable.Commit(dbTable, planStep, txId, metaShard, {TWriteId{writeId}}, [](ui64){ return true; }); auto committed = insertTable.GetCommitted(); UNIT_ASSERT_EQUAL(committed.size(), 1); |