diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-09 13:07:22 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-09 13:07:22 +0300 |
commit | d1c4287af422f0fb30b901b8d67f8c46c28cfa47 (patch) | |
tree | e3f713ab366a72136046958eea5c7f933d123cd6 | |
parent | f4e9c332e8837d3d34870573987b89b6795eca02 (diff) | |
download | ydb-d1c4287af422f0fb30b901b8d67f8c46c28cfa47.tar.gz |
Return nullptr if there is neither indexes nor streams KIKIMR-14732
ref:285cfd6561a8b8446470ec3c09e9422b88d1b782
-rw-r--r-- | ydb/core/tx/datashard/change_collector.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_erase.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_erase.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp | 18 |
5 files changed, 39 insertions, 17 deletions
diff --git a/ydb/core/tx/datashard/change_collector.cpp b/ydb/core/tx/datashard/change_collector.cpp index be21b22cbf..216d0876a9 100644 --- a/ydb/core/tx/datashard/change_collector.cpp +++ b/ydb/core/tx/datashard/change_collector.cpp @@ -87,13 +87,20 @@ private: }; // TChangeCollectorProxy IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) { + const bool hasAsyncIndexes = table.HasAsyncIndexes(); + const bool hasCdcStreams = table.HasCdcStreams(); + + if (!hasAsyncIndexes && !hasCdcStreams) { + return nullptr; + } + auto proxy = MakeHolder<TChangeCollectorProxy>(); - if (table.HasAsyncIndexes()) { + if (hasAsyncIndexes) { proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, db, isImmediateTx)); } - if (table.HasCdcStreams()) { + if (hasCdcStreams) { proxy->AddUnderlying(MakeHolder<TCdcStreamChangeCollector>(&dataShard, db, isImmediateTx)); } diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index a3cbd5a476..f94c60bb11 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -59,6 +59,9 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans if (CollectChanges) { ChangeCollector.Reset(CreateChangeCollector(*self, txc.DB, tableInfo, true)); + } + + if (ChangeCollector) { ChangeCollector->SetWriteVersion(writeVersion); if (ChangeCollector->NeedToReadKeys()) { ChangeCollector->SetReadVersion(readVersion); diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 574c5ff647..422b64db5e 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -66,9 +66,12 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, params.Txc->DB, tableInfo, true)); - params.Tx->ChangeCollector->SetWriteVersion(params.WriteVersion); - if (params.Tx->ChangeCollector->NeedToReadKeys()) { - params.Tx->ChangeCollector->SetReadVersion(params.ReadVersion); + } + + if (auto collector = params.GetChangeCollector()) { + collector->SetWriteVersion(params.WriteVersion); + if (collector->NeedToReadKeys()) { + collector->SetReadVersion(params.ReadVersion); } } @@ -133,9 +136,10 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } - Y_VERIFY(params.Tx->ChangeCollector); - if (!params.Tx->ChangeCollector->Collect(fullTableId, NTable::ERowOp::Erase, key, {})) { - pageFault = true; + if (auto collector = params.GetChangeCollector()) { + if (!collector->Collect(fullTableId, NTable::ERowOp::Erase, key, {})) { + pageFault = true; + } } if (pageFault) { @@ -147,8 +151,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } if (pageFault) { - if (params && params.Tx->ChangeCollector) { - params.Tx->ChangeCollector->Reset(); + if (auto collector = params.GetChangeCollector()) { + collector->Reset(); } return EStatus::PageFault; diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h index 7663b47d38..d526178b9d 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.h +++ b/ydb/core/tx/datashard/datashard_direct_erase.h @@ -52,6 +52,10 @@ class TDirectTxErase : public IDirectTx { return true; } + + IChangeCollector* GetChangeCollector() const { + return Tx ? Tx->ChangeCollector.Get() : nullptr; + } }; static EStatus CheckedExecute( diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 9b62c04a72..f16db8eaa3 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -39,22 +39,26 @@ public: const auto& eraseTx = tx->GetDistributedEraseTx(); const auto& request = eraseTx->GetRequest(); - const auto versions = DataShard.GetReadWriteVersions(op.Get()); + auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(op.Get()); if (eraseTx->HasDependents()) { THolder<IChangeCollector> changeCollector{CreateChangeCollector(DataShard, txc.DB, request.GetTableId(), false)}; - changeCollector->SetWriteVersion(versions.WriteVersion); - if (changeCollector->NeedToReadKeys()) { - changeCollector->SetReadVersion(versions.ReadVersion); + if (changeCollector) { + changeCollector->SetWriteVersion(writeVersion); + if (changeCollector->NeedToReadKeys()) { + changeCollector->SetReadVersion(readVersion); + } } auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize()); - if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), versions.WriteVersion, changeCollector.Get())) { + if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, changeCollector.Get())) { return EExecutionStatus::Restart; } - op->ChangeRecords() = std::move(changeCollector->GetCollected()); + if (changeCollector) { + op->ChangeRecords() = std::move(changeCollector->GetCollected()); + } } else if (eraseTx->HasDependencies()) { THashMap<ui64, TDynBitMap> presentRows; for (const auto& dependency : eraseTx->GetDependencies()) { @@ -68,7 +72,7 @@ public: Y_VERIFY(body.ParseFromArray(rs.Body.data(), rs.Body.size())); Y_VERIFY(presentRows.contains(rs.Origin)); - const bool ok = Execute(txc, request, presentRows.at(rs.Origin), DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), versions.WriteVersion); + const bool ok = Execute(txc, request, presentRows.at(rs.Origin), DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), writeVersion); Y_VERIFY(ok); } } |