aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Demin <deminds@ydb.tech>2025-02-17 18:54:10 +0300
committerGitHub <noreply@github.com>2025-02-17 15:54:10 +0000
commitb0912839e377593c0e1245347e282c8420ef6ac4 (patch)
tree1ba933ef9aee5ba58bfb96ae72af6f4361684c28
parentebacf4512d849d7c8bf583e0207c7b3e152c860e (diff)
downloadydb-b0912839e377593c0e1245347e282c8420ef6ac4.tar.gz
VIEW: minor refactoring in import (#14664)
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import__create.cpp42
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp2
2 files changed, 32 insertions, 12 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
index a09f090c03..f434d4983a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp
@@ -28,16 +28,35 @@ using namespace NTabletFlatExecutor;
namespace {
-bool IsWaiting(const TImportInfo::TItem& item) {
- return item.State == TImportInfo::EState::Waiting;
+using TItem = TImportInfo::TItem;
+using EState = TImportInfo::EState;
+
+bool IsWaiting(const TItem& item) {
+ return item.State == EState::Waiting;
+}
+
+THashSet<EState> CollectItemStates(const TVector<TItem>& items) {
+ THashSet<EState> itemStates;
+ for (const auto& item : items) {
+ itemStates.emplace(item.State);
+ }
+ return itemStates;
+}
+
+bool AllDone(const THashSet<EState>& itemStates) {
+ return AllOf(itemStates, [](EState state) { return state == EState::Done; });
}
-bool IsDoneOrWaiting(const TImportInfo::TItem& item) {
- return TImportInfo::TItem::IsDone(item) || IsWaiting(item);
+bool AllWaiting(const THashSet<EState>& itemStates) {
+ return AllOf(itemStates, [](EState state) { return state == EState::Waiting; });
+}
+
+bool AllDoneOrWaiting(const THashSet<EState>& itemStates) {
+ return AllOf(itemStates, [](EState state) { return state == EState::Done || state == EState::Waiting; });
}
// the item is to be created by query, i.e. it is not a table
-bool IsCreatedByQuery(const TImportInfo::TItem& item) {
+bool IsCreatedByQuery(const TItem& item) {
return !item.CreationQuery.empty();
}
@@ -904,13 +923,13 @@ private:
// Scheme error happens when the view depends on a table (or a view) that is not yet imported.
// Instead of tracking view dependencies, we simply retry the creation of the view later.
item.State = EState::Waiting;
+ Self->PersistImportItemState(db, importInfo, message.ItemIdx);
- if (AllOf(importInfo->Items, IsWaiting)) {
- // All items are waiting? Cancel the import, or we will end up waiting indefinitely.
+ const auto itemStates = CollectItemStates(importInfo->Items);
+ if (AllWaiting(itemStates)) {
+ // Cancel the import, or we will end up waiting indefinitely.
return CancelAndPersist(db, importInfo, message.ItemIdx, error, "creation query failed");
}
-
- Self->PersistImportItemState(db, importInfo, message.ItemIdx);
return;
}
@@ -1218,10 +1237,11 @@ private:
return SendNotificationsIfFinished(importInfo);
}
- if (AllOf(importInfo->Items, &TImportInfo::TItem::IsDone)) {
+ const auto itemStates = CollectItemStates(importInfo->Items);
+ if (AllDone(itemStates)) {
importInfo->State = EState::Done;
importInfo->EndTime = TAppData::TimeProvider->Now();
- } else if (AllOf(importInfo->Items, IsDoneOrWaiting)) {
+ } else if (AllDoneOrWaiting(itemStates)) {
RetryViewsCreation(importInfo, db, ctx);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp
index 267a0679fa..fbc88706e1 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_query_executor.cpp
@@ -99,7 +99,7 @@ class TSchemeQueryExecutor: public TActorBootstrapped<TSchemeQueryExecutor> {
void Finish(Ydb::StatusIds::StatusCode status, std::variant<TString, NKikimrSchemeOp::TModifyScheme> result) {
auto logMessage = TStringBuilder() << "TSchemeQueryExecutor Reply"
<< ", self: " << SelfId()
- << ", success: " << status;
+ << ", status: " << status;
LOG_I(logMessage);
std::visit([&]<typename T>(T& value) {