aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-06-09 13:07:22 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-06-09 13:07:22 +0300
commitd1c4287af422f0fb30b901b8d67f8c46c28cfa47 (patch)
treee3f713ab366a72136046958eea5c7f933d123cd6
parentf4e9c332e8837d3d34870573987b89b6795eca02 (diff)
downloadydb-d1c4287af422f0fb30b901b8d67f8c46c28cfa47.tar.gz
Return nullptr if there is neither indexes nor streams KIKIMR-14732
ref:285cfd6561a8b8446470ec3c09e9422b88d1b782
-rw-r--r--ydb/core/tx/datashard/change_collector.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp20
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.h4
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp18
5 files changed, 39 insertions, 17 deletions
diff --git a/ydb/core/tx/datashard/change_collector.cpp b/ydb/core/tx/datashard/change_collector.cpp
index be21b22cbf..216d0876a9 100644
--- a/ydb/core/tx/datashard/change_collector.cpp
+++ b/ydb/core/tx/datashard/change_collector.cpp
@@ -87,13 +87,20 @@ private:
}; // TChangeCollectorProxy
IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) {
+ const bool hasAsyncIndexes = table.HasAsyncIndexes();
+ const bool hasCdcStreams = table.HasCdcStreams();
+
+ if (!hasAsyncIndexes && !hasCdcStreams) {
+ return nullptr;
+ }
+
auto proxy = MakeHolder<TChangeCollectorProxy>();
- if (table.HasAsyncIndexes()) {
+ if (hasAsyncIndexes) {
proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, db, isImmediateTx));
}
- if (table.HasCdcStreams()) {
+ if (hasCdcStreams) {
proxy->AddUnderlying(MakeHolder<TCdcStreamChangeCollector>(&dataShard, db, isImmediateTx));
}
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index a3cbd5a476..f94c60bb11 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -59,6 +59,9 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
if (CollectChanges) {
ChangeCollector.Reset(CreateChangeCollector(*self, txc.DB, tableInfo, true));
+ }
+
+ if (ChangeCollector) {
ChangeCollector->SetWriteVersion(writeVersion);
if (ChangeCollector->NeedToReadKeys()) {
ChangeCollector->SetReadVersion(readVersion);
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 574c5ff647..422b64db5e 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -66,9 +66,12 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, params.Txc->DB, tableInfo, true));
- params.Tx->ChangeCollector->SetWriteVersion(params.WriteVersion);
- if (params.Tx->ChangeCollector->NeedToReadKeys()) {
- params.Tx->ChangeCollector->SetReadVersion(params.ReadVersion);
+ }
+
+ if (auto collector = params.GetChangeCollector()) {
+ collector->SetWriteVersion(params.WriteVersion);
+ if (collector->NeedToReadKeys()) {
+ collector->SetReadVersion(params.ReadVersion);
}
}
@@ -133,9 +136,10 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
}
- Y_VERIFY(params.Tx->ChangeCollector);
- if (!params.Tx->ChangeCollector->Collect(fullTableId, NTable::ERowOp::Erase, key, {})) {
- pageFault = true;
+ if (auto collector = params.GetChangeCollector()) {
+ if (!collector->Collect(fullTableId, NTable::ERowOp::Erase, key, {})) {
+ pageFault = true;
+ }
}
if (pageFault) {
@@ -147,8 +151,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
if (pageFault) {
- if (params && params.Tx->ChangeCollector) {
- params.Tx->ChangeCollector->Reset();
+ if (auto collector = params.GetChangeCollector()) {
+ collector->Reset();
}
return EStatus::PageFault;
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h
index 7663b47d38..d526178b9d 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.h
+++ b/ydb/core/tx/datashard/datashard_direct_erase.h
@@ -52,6 +52,10 @@ class TDirectTxErase : public IDirectTx {
return true;
}
+
+ IChangeCollector* GetChangeCollector() const {
+ return Tx ? Tx->ChangeCollector.Get() : nullptr;
+ }
};
static EStatus CheckedExecute(
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 9b62c04a72..f16db8eaa3 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -39,22 +39,26 @@ public:
const auto& eraseTx = tx->GetDistributedEraseTx();
const auto& request = eraseTx->GetRequest();
- const auto versions = DataShard.GetReadWriteVersions(op.Get());
+ auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(op.Get());
if (eraseTx->HasDependents()) {
THolder<IChangeCollector> changeCollector{CreateChangeCollector(DataShard, txc.DB, request.GetTableId(), false)};
- changeCollector->SetWriteVersion(versions.WriteVersion);
- if (changeCollector->NeedToReadKeys()) {
- changeCollector->SetReadVersion(versions.ReadVersion);
+ if (changeCollector) {
+ changeCollector->SetWriteVersion(writeVersion);
+ if (changeCollector->NeedToReadKeys()) {
+ changeCollector->SetReadVersion(readVersion);
+ }
}
auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize());
- if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), versions.WriteVersion, changeCollector.Get())) {
+ if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, changeCollector.Get())) {
return EExecutionStatus::Restart;
}
- op->ChangeRecords() = std::move(changeCollector->GetCollected());
+ if (changeCollector) {
+ op->ChangeRecords() = std::move(changeCollector->GetCollected());
+ }
} else if (eraseTx->HasDependencies()) {
THashMap<ui64, TDynBitMap> presentRows;
for (const auto& dependency : eraseTx->GetDependencies()) {
@@ -68,7 +72,7 @@ public:
Y_VERIFY(body.ParseFromArray(rs.Body.data(), rs.Body.size()));
Y_VERIFY(presentRows.contains(rs.Origin));
- const bool ok = Execute(txc, request, presentRows.at(rs.Origin), DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), versions.WriteVersion);
+ const bool ok = Execute(txc, request, presentRows.at(rs.Origin), DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), writeVersion);
Y_VERIFY(ok);
}
}