diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-01-31 19:15:30 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-01-31 19:15:30 +0300 |
commit | ac25aea7e219c6ce4287ecbdd0c4fee2ffaa1c4a (patch) | |
tree | f8cd0b57ce7fd767d9b8a93c94b6b53768d5ec3f | |
parent | f77b2bcb6de6ac1a686cbf27026256ad7a6c7838 (diff) | |
download | ydb-ac25aea7e219c6ce4287ecbdd0c4fee2ffaa1c4a.tar.gz |
Check path state after loading the operation list
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index f16c47eaf48..9b3ea10a731 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -19,7 +19,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { TDeque<TPathId> BlockStoreVolumesToClean; TVector<ui64> ExportsToResume; TVector<ui64> ImportsToResume; - TVector<TPathId> CdcStreamScansToResume; + THashMap<TPathId, TVector<TPathId>> CdcStreamScansToResume; bool Broken = false; explicit TTxInit(TSelf *self) @@ -2830,11 +2830,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Y_VERIFY_S(Self->PathsById.contains(path->ParentPathId), "Parent path is not found" << ", cdc stream pathId: " << pathId << ", parent pathId: " << path->ParentPathId); - auto parent = Self->PathsById.at(path->ParentPathId); - - if (parent->NormalState()) { - CdcStreamScansToResume.push_back(pathId); - } + CdcStreamScansToResume[path->ParentPathId].push_back(pathId); } if (!rowset.Next()) { @@ -3371,6 +3367,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { // Remember which paths are still under operation pathsUnderOperation.insert(txState.TargetPathId); + if (CdcStreamScansToResume.contains(txState.TargetPathId)) { + CdcStreamScansToResume.erase(txState.TargetPathId); + } + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Adjusted PathState" << ", pathId: " << txState.TargetPathId @@ -4771,11 +4771,17 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { return; } + // flatten + TVector<TPathId> cdcStreamScansToResume; + for (auto& [_, v] : CdcStreamScansToResume) { + std::move(v.begin(), v.end(), std::back_inserter(cdcStreamScansToResume)); + } + Self->ActivateAfterInitialization(ctx, { .DelayPublications = std::move(delayPublications), .ExportIds = ExportsToResume, .ImportsIds = ImportsToResume, - .CdcStreamScans = std::move(CdcStreamScansToResume), + .CdcStreamScans = std::move(cdcStreamScansToResume), .TablesToClean = std::move(TablesToClean), .BlockStoreVolumesToClean = std::move(BlockStoreVolumesToClean), }); |