aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-01-31 19:15:30 +0300
committerilnaz <ilnaz@ydb.tech>2023-01-31 19:15:30 +0300
commitac25aea7e219c6ce4287ecbdd0c4fee2ffaa1c4a (patch)
treef8cd0b57ce7fd767d9b8a93c94b6b53768d5ec3f
parentf77b2bcb6de6ac1a686cbf27026256ad7a6c7838 (diff)
downloadydb-ac25aea7e219c6ce4287ecbdd0c4fee2ffaa1c4a.tar.gz
Check path state after loading the operation list
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp20
1 files changed, 13 insertions, 7 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index f16c47eaf48..9b3ea10a731 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -19,7 +19,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TDeque<TPathId> BlockStoreVolumesToClean;
TVector<ui64> ExportsToResume;
TVector<ui64> ImportsToResume;
- TVector<TPathId> CdcStreamScansToResume;
+ THashMap<TPathId, TVector<TPathId>> CdcStreamScansToResume;
bool Broken = false;
explicit TTxInit(TSelf *self)
@@ -2830,11 +2830,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found"
<< ", cdc stream pathId: " << pathId
<< ", parent pathId: " << path->ParentPathId);
- auto parent = Self->PathsById.at(path->ParentPathId);
-
- if (parent->NormalState()) {
- CdcStreamScansToResume.push_back(pathId);
- }
+ CdcStreamScansToResume[path->ParentPathId].push_back(pathId);
}
if (!rowset.Next()) {
@@ -3371,6 +3367,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
// Remember which paths are still under operation
pathsUnderOperation.insert(txState.TargetPathId);
+ if (CdcStreamScansToResume.contains(txState.TargetPathId)) {
+ CdcStreamScansToResume.erase(txState.TargetPathId);
+ }
+
LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Adjusted PathState"
<< ", pathId: " << txState.TargetPathId
@@ -4771,11 +4771,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return;
}
+ // flatten
+ TVector<TPathId> cdcStreamScansToResume;
+ for (auto& [_, v] : CdcStreamScansToResume) {
+ std::move(v.begin(), v.end(), std::back_inserter(cdcStreamScansToResume));
+ }
+
Self->ActivateAfterInitialization(ctx, {
.DelayPublications = std::move(delayPublications),
.ExportIds = ExportsToResume,
.ImportsIds = ImportsToResume,
- .CdcStreamScans = std::move(CdcStreamScansToResume),
+ .CdcStreamScans = std::move(cdcStreamScansToResume),
.TablesToClean = std::move(TablesToClean),
.BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean),
});