diff options
author | Daniil Demin <deminds@ydb.tech> | 2025-02-17 18:54:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-17 15:54:10 +0000 |
commit | b0912839e377593c0e1245347e282c8420ef6ac4 (patch) | |
tree | 1ba933ef9aee5ba58bfb96ae72af6f4361684c28 | |
parent | ebacf4512d849d7c8bf583e0207c7b3e152c860e (diff) | |
download | ydb-b0912839e377593c0e1245347e282c8420ef6ac4.tar.gz |
VIEW: minor refactoring in import (#14664)
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_import__create.cpp | 42 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp | 2 |
2 files changed, 32 insertions, 12 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index a09f090c03..f434d4983a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -28,16 +28,35 @@ using namespace NTabletFlatExecutor; namespace { -bool IsWaiting(const TImportInfo::TItem& item) { - return item.State == TImportInfo::EState::Waiting; +using TItem = TImportInfo::TItem; +using EState = TImportInfo::EState; + +bool IsWaiting(const TItem& item) { + return item.State == EState::Waiting; +} + +THashSet<EState> CollectItemStates(const TVector<TItem>& items) { + THashSet<EState> itemStates; + for (const auto& item : items) { + itemStates.emplace(item.State); + } + return itemStates; +} + +bool AllDone(const THashSet<EState>& itemStates) { + return AllOf(itemStates, [](EState state) { return state == EState::Done; }); } -bool IsDoneOrWaiting(const TImportInfo::TItem& item) { - return TImportInfo::TItem::IsDone(item) || IsWaiting(item); +bool AllWaiting(const THashSet<EState>& itemStates) { + return AllOf(itemStates, [](EState state) { return state == EState::Waiting; }); +} + +bool AllDoneOrWaiting(const THashSet<EState>& itemStates) { + return AllOf(itemStates, [](EState state) { return state == EState::Done || state == EState::Waiting; }); } // the item is to be created by query, i.e. it is not a table -bool IsCreatedByQuery(const TImportInfo::TItem& item) { +bool IsCreatedByQuery(const TItem& item) { return !item.CreationQuery.empty(); } @@ -904,13 +923,13 @@ private: // Scheme error happens when the view depends on a table (or a view) that is not yet imported. // Instead of tracking view dependencies, we simply retry the creation of the view later. item.State = EState::Waiting; + Self->PersistImportItemState(db, importInfo, message.ItemIdx); - if (AllOf(importInfo->Items, IsWaiting)) { - // All items are waiting? Cancel the import, or we will end up waiting indefinitely. + const auto itemStates = CollectItemStates(importInfo->Items); + if (AllWaiting(itemStates)) { + // Cancel the import, or we will end up waiting indefinitely. return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed"); } - - Self->PersistImportItemState(db, importInfo, message.ItemIdx); return; } @@ -1218,10 +1237,11 @@ private: return SendNotificationsIfFinished(importInfo); } - if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) { + const auto itemStates = CollectItemStates(importInfo->Items); + if (AllDone(itemStates)) { importInfo->State = EState::Done; importInfo->EndTime = TAppData::TimeProvider->Now(); - } else if (AllOf(importInfo->Items, IsDoneOrWaiting)) { + } else if (AllDoneOrWaiting(itemStates)) { RetryViewsCreation(importInfo, db, ctx); } diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp index 267a0679fa..fbc88706e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp @@ -99,7 +99,7 @@ class TSchemeQueryExecutor: public TActorBootstrapped<TSchemeQueryExecutor> { void Finish(Ydb::StatusIds::StatusCode status, std::variant<TString, NKikimrSchemeOp::TModifyScheme> result) { auto logMessage = TStringBuilder() << "TSchemeQueryExecutor Reply" << ", self: " << SelfId() - << ", success: " << status; + << ", status: " << status; LOG_I(logMessage); std::visit([&]<typename T>(T& value) { |