aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-03-13 07:57:13 +0300
committerGitHub <noreply@github.com>2024-03-13 07:57:13 +0300
commit8500ceb8f4f8742530f7ca71c99e426ba3617e28 (patch)
tree90ddba80d62b2882e1e8d27c9ba72ed770ae7399
parenta9fc0430cb93c2c5488e752cc147360be35b37d8 (diff)
downloadydb-8500ceb8f4f8742530f7ca71c99e426ba3617e28.tar.gz
split tiering and scheme actualizers (#2679)
-rw-r--r--ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h3
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h34
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h58
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make12
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h (renamed from ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.h)7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make13
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp248
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h202
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp81
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h71
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make11
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp159
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h102
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make11
-rw-r--r--ydb/core/tx/columnshard/engines/storage/actualizer/ya.make4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h8
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.h2
-rw-r--r--ydb/core/tx/columnshard/splitter/rb_splitter.h1
-rw-r--r--ydb/core/tx/columnshard/splitter/ut/ya.make1
-rw-r--r--ydb/core/tx/columnshard/splitter/ya.make2
28 files changed, 630 insertions, 445 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp
index 09da4e8826d..e089ba11c1e 100644
--- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp
@@ -1,6 +1,7 @@
#include "context.h"
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
namespace NKikimr::NOlap::NActualizer {
@@ -16,14 +17,12 @@ TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, cons
}
-bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait, const TInstant now) {
- const TInstant maxChangePortionInstant = info.RecordSnapshotMax().GetPlanInstant();
- if (features.GetTargetTierName() != IStoragesManager::DefaultStorageId && info.GetTierNameDef(IStoragesManager::DefaultStorageId) == IStoragesManager::DefaultStorageId) {
- if (now - maxChangePortionInstant < NYDBTest::TControllers::GetColumnShardController()->GetLagForCompactionBeforeTierings(TDuration::Minutes(60))) {
- Counters.OnActualizationSkipTooFreshPortion(now - maxChangePortionInstant);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", now - maxChangePortionInstant);
- return true;
- }
+bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait) {
+ if (!UsedPortions.emplace(info.GetAddress()).second) {
+ return true;
+ }
+ if (DataLocksManager->IsLocked(info)) {
+ return true;
}
const auto buildNewTask = [&]() {
diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h
index da972b86e18..0e5a5e326d3 100644
--- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h
+++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h
@@ -26,6 +26,7 @@ public:
class TTieringProcessContext {
private:
+ THashSet<TPortionAddress> UsedPortions;
const ui64 MemoryUsageLimit;
TSaverContext SaverContext;
THashMap<TRWAddress, std::vector<TTaskConstructor>> Tasks;
@@ -43,7 +44,7 @@ public:
return Tasks;
}
- bool AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait, const TInstant now);
+ bool AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait);
bool IsRWAddressAvailable(const TRWAddress& address) const {
auto it = Tasks.find(address);
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 5ed8dc6c84b..fc9052c3823 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -303,11 +303,7 @@ public:
}
bool CheckForCleanup(const TSnapshot& snapshot) const {
- if (!HasRemoveSnapshot()) {
- return false;
- } else {
- return GetRemoveSnapshotVerified() <= snapshot;
- }
+ return IsRemovedFor(snapshot);
}
bool CheckForCleanup() const {
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp
new file mode 100644
index 00000000000..d193e92e3e1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp
@@ -0,0 +1,5 @@
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NActualizer {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h
new file mode 100644
index 00000000000..077ce3a2754
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h
@@ -0,0 +1,34 @@
+#pragma once
+#include "context.h"
+
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+
+namespace NKikimr::NOlap::NActualizer {
+
+class IActualizer {
+protected:
+ virtual void DoAddPortion(const TPortionInfo& info, const TAddExternalContext& context) = 0;
+ virtual void DoRemovePortion(const TPortionInfo& info) = 0;
+ virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const = 0;
+public:
+ virtual ~IActualizer() = default;
+ void BuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) {
+ return DoBuildTasks(tasksContext, externalContext, internalContext);
+ }
+ void AddPortion(const std::shared_ptr<TPortionInfo>& info, const TAddExternalContext& context) {
+ AFL_VERIFY(info);
+ if (info->HasRemoveSnapshot()) {
+ return;
+ }
+ return DoAddPortion(*info, context);
+ }
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& info) {
+ AFL_VERIFY(info);
+ if (info->HasRemoveSnapshot()) {
+ return;
+ }
+ return DoRemovePortion(*info);
+ }
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp
new file mode 100644
index 00000000000..6ef73de8ceb
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp
@@ -0,0 +1,5 @@
+#include "context.h"
+
+namespace NKikimr::NOlap::NActualizer {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h
new file mode 100644
index 00000000000..0c2ad3b78b3
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h
@@ -0,0 +1,58 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+#include <util/generic/hash.h>
+
+namespace NKikimr::NOlap {
+class TPortionInfo;
+}
+
+namespace NKikimr::NOlap::NActualizer {
+
+class TTieringProcessContext;
+
+class TAddExternalContext {
+private:
+ YDB_READONLY_DEF(TInstant, Now);
+ const THashMap<ui64, std::shared_ptr<TPortionInfo>>& Portions;
+public:
+ TAddExternalContext(const TInstant now, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions)
+ : Now(now)
+ , Portions(portions)
+ {
+
+ }
+
+ const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const {
+ return Portions;
+ }
+};
+
+class TExternalTasksContext {
+private:
+ const THashMap<ui64, std::shared_ptr<TPortionInfo>>& Portions;
+public:
+ const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const {
+ return Portions;
+ }
+
+ const std::shared_ptr<TPortionInfo>& GetPortionVerified(const ui64 portionId) const {
+ auto it = Portions.find(portionId);
+ AFL_VERIFY(it != Portions.end());
+ return it->second;
+ }
+
+ TExternalTasksContext(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions)
+ : Portions(portions)
+ {
+
+ }
+};
+
+class TInternalTasksContext {
+public:
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make
new file mode 100644
index 00000000000..90979469658
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ context.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/scheme/versions
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp
new file mode 100644
index 00000000000..f6e4cca6431
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp
@@ -0,0 +1,5 @@
+#include "counters.h"
+
+namespace NKikimr::NOlap::NActualizer {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.h b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h
index 520befbb965..c796b62f2c0 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.h
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h
@@ -1,9 +1,6 @@
#pragma once
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
-#include <ydb/core/formats/arrow/replace_key.h>
-#include <ydb/library/accessor/accessor.h>
-#include <ydb/core/tx/columnshard/splitter/settings.h>
-#include <ydb/core/tx/columnshard/counters/engine_logs.h>
+#include <ydb/core/tx/columnshard/counters/common/owner.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
namespace NKikimr::NOlap::NActualizer {
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make
new file mode 100644
index 00000000000..d73b370747b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ counters.cpp
+)
+
+PEERDIR(
+ ydb/library/actors/core
+ ydb/core/tx/columnshard/engines/portions
+ ydb/core/tx/columnshard/counters/common
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp
deleted file mode 100644
index 146a8700def..00000000000
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp
+++ /dev/null
@@ -1,5 +0,0 @@
-#include "counters.h"
-
-namespace NKikimr::NOlap::NStorageOptimizer::NBuckets {
-
-}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp
index 739c1ebae2e..e4bd166e9c6 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp
@@ -1,249 +1,51 @@
#include "index.h"
-#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
-#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
-#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h>
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h>
namespace NKikimr::NOlap::NActualizer {
-void TGranuleActualizationIndex::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portionInfos) const {
- THashSet<ui64> usedPortions;
- for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
- if (!context.IsRWAddressAvailable(address)) {
- break;
- }
- for (auto&& [duration, portions] : addressPortions) {
- if (duration - (context.Now - StartInstant) > TDuration::Zero()) {
- break;
- }
- bool limitEnriched = false;
- for (auto&& p : portions) {
- auto itPortionInfo = portionInfos.find(p);
- AFL_VERIFY(itPortionInfo != portionInfos.end());
- if (context.DataLocksManager->IsLocked(*itPortionInfo->second)) {
- continue;
- }
- auto portion = itPortionInfo->second;
-
- auto info = BuildActualizationInfo(portion, context.Now);
- auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot());
- TPortionEvictionFeatures features(portionScheme, info.GetScheme() ? info.GetScheme()->GetTargetScheme() : portionScheme, portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
-
- std::optional<TDuration> lateness;
- if (info.GetEviction()) {
- features.SetTargetTierName(info.GetEviction()->GetTargetTierName());
- lateness = info.GetEviction()->GetLateness();
- }
-
- if (!context.AddPortion(*portion, std::move(features), lateness, context.Now)) {
- limitEnriched = true;
- break;
- }
- }
- if (limitEnriched) {
- break;
- }
- }
- }
- for (auto&& [address, portions] : PortionsToActualizeScheme) {
- if (!context.IsRWAddressAvailable(address)) {
- break;
- }
- for (auto&& portionId : portions) {
- if (!usedPortions.emplace(portionId).second) {
- continue;
- }
- auto itPortionInfo = portionInfos.find(portionId);
- AFL_VERIFY(itPortionInfo != portionInfos.end());
- auto portion = itPortionInfo->second;
- if (context.DataLocksManager->IsLocked(*portion)) {
- continue;
- }
-
- auto info = BuildActualizationInfo(portion, context.Now);
- auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot());
- TPortionEvictionFeatures features(portionScheme, info.GetScheme() ? info.GetScheme()->GetTargetScheme() : portionScheme, portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
-
- if (!context.AddPortion(*portion, std::move(features), {}, context.Now)) {
- break;
- }
- }
+void TGranuleActualizationIndex::BuildActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const {
+ TInternalTasksContext internalContext;
+ for (auto&& i : Actualizers) {
+ i->BuildTasks(tasksContext, externalContext, internalContext);
}
}
-TGranuleActualizationIndex::TActualizationInfo TGranuleActualizationIndex::BuildActualizationInfo(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) const {
- const auto portionSchema = VersionedIndex.GetSchema(portion->GetMinSnapshot());
- auto targetSchema = ActualCriticalScheme ? ActualCriticalScheme : portionSchema;
- const auto currentTierName = portion->GetTierNameDef(IStoragesManager::DefaultStorageId);
- TActualizationInfo result;
- if (Tiering) {
- AFL_VERIFY(TieringColumnId);
- auto statOperator = portionSchema->GetIndexInfo().GetStatistics(NStatistics::TIdentifier(NStatistics::EType::Max, {*TieringColumnId}));
- std::shared_ptr<arrow::Scalar> max;
- if (!statOperator) {
- max = portion->MaxValue(*TieringColumnId);
- if (!max) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
- return result;
- }
- } else {
- NYDBTest::TControllers::GetColumnShardController()->OnStatisticsUsage(statOperator);
- max = statOperator.GetScalarVerified(portion->GetMeta().GetStatisticsStorage());
- }
- auto tieringInfo = Tiering->GetTierToMove(max, now);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString());
- std::optional<i64> d;
- std::set<TString> storagesWrite;
- TString targetTierName;
- if (portion->GetTierNameDef(IStoragesManager::DefaultStorageId) != tieringInfo.GetCurrentTierName()) {
- d = -1 * tieringInfo.GetCurrentTierLag().GetValue();
- targetTierName = tieringInfo.GetCurrentTierName();
- } else if (tieringInfo.GetNextTierName()) {
- d = tieringInfo.GetNextTierWaitingVerified().GetValue();
- targetTierName = tieringInfo.GetNextTierNameVerified();
- }
- if (d) {
-// if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) {
-// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
-// AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
-// }
- auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(targetTierName);
- auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
- TRWAddress address(std::move(storagesRead), std::move(storagesWrite));
- result.SetEviction(std::move(address), targetTierName, *d);
- if (*d < (i64)TDuration::Minutes(1).Seconds()) {
- return result;
- }
- }
- } else if (currentTierName != IStoragesManager::DefaultStorageId) {
-// if (currentTierName == "deploy_logs_s3") {
-// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("pathId", PathId);
-// AFL_VERIFY(false)("pathId", PathId);
-// }
- auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(IStoragesManager::DefaultStorageId);
- auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
- TRWAddress address(std::move(storagesRead), std::move(storagesWrite));
- result.SetEviction(std::move(address), IStoragesManager::DefaultStorageId, 0);
- return result;
- }
- if (portionSchema->GetVersion() < targetSchema->GetVersion()) {
- auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
- auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
- TRWAddress address(std::move(storagesRead), std::move(storagesWrite));
- result.SetScheme(std::move(address), targetSchema);
- }
- return result;
-}
-
-void TGranuleActualizationIndex::AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) {
- if (!Started) {
- return;
- }
- if (portion->HasRemoveSnapshot()) {
- return;
- }
- auto actualizationInfo = BuildActualizationInfo(portion, now);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("actualization_info", actualizationInfo.DebugString());
- TPortionIndexInfo result;
- if (actualizationInfo.GetEviction()) {
- auto& eviction = *actualizationInfo.GetEviction();
- PortionIdByWaitDuration[eviction.GetAddress()][eviction.GetWaitDuration() + (now - StartInstant)].emplace(portion->GetPortionId());
- auto address = eviction.GetAddress();
- result.SetEviction(std::move(address), eviction.GetWaitDuration() + (now - StartInstant));
- }
- if (actualizationInfo.GetScheme()) {
- auto& scheme = *actualizationInfo.GetScheme();
- PortionsToActualizeScheme[scheme.GetAddress()].emplace(portion->GetPortionId());
- auto address = scheme.GetAddress();
- result.SetScheme(std::move(address));
- }
- result.AddCounters(Counters, portion);
- if (!result.IsEmpty()) {
- AFL_VERIFY(PortionsInfo.emplace(portion->GetPortionId(), std::move(result)).second);
+void TGranuleActualizationIndex::AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TAddExternalContext& context) {
+ for (auto&& i : Actualizers) {
+ i->AddPortion(portion, context);
}
}
void TGranuleActualizationIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
- auto it = PortionsInfo.find(portion->GetPortionId());
- if (it == PortionsInfo.end()) {
- return;
- }
- if (portion->HasRemoveSnapshot()) {
- return;
+ for (auto&& i : Actualizers) {
+ i->RemovePortion(portion);
}
- if (it->second.GetEviction()) {
- auto itAddress = PortionIdByWaitDuration.find(it->second.GetEviction()->GetRWAddress());
- AFL_VERIFY(itAddress != PortionIdByWaitDuration.end());
- auto itDuration = itAddress->second.find(it->second.GetEviction()->GetWaitDuration());
- AFL_VERIFY(itDuration != itAddress->second.end());
- AFL_VERIFY(itDuration->second.erase(portion->GetPortionId()));
- if (itDuration->second.empty()) {
- itAddress->second.erase(itDuration);
- }
- if (itAddress->second.empty()) {
- PortionIdByWaitDuration.erase(itAddress);
- }
- }
-
- if (it->second.GetScheme()) {
- auto itAddress = PortionsToActualizeScheme.find(it->second.GetScheme()->GetRWAddress());
- AFL_VERIFY(itAddress != PortionsToActualizeScheme.end());
- AFL_VERIFY(itAddress->second.erase(portion->GetPortionId()));
- if (itAddress->second.empty()) {
- PortionsToActualizeScheme.erase(itAddress);
- }
- }
- it->second.RemoveCounters(Counters, portion);
- PortionsInfo.erase(it);
}
-void TGranuleActualizationIndex::Rebuild(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions) {
- if (!Started) {
- return;
- }
- StartInstant = HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now();
- PortionIdByWaitDuration.clear();
- PortionsToActualizeScheme.clear();
- PortionsInfo.clear();
- for (auto&& [_, portion] : portions) {
- if (portion->HasRemoveSnapshot()) {
- continue;
- }
- AddPortion(portion, StartInstant);
- }
+void TGranuleActualizationIndex::RefreshTiering(const std::optional<TTiering>& info, const TAddExternalContext& context) {
+ AFL_VERIFY(TieringActualizer);
+ TieringActualizer->Refresh(info, context);
}
-bool TGranuleActualizationIndex::RefreshTiering(const std::optional<TTiering>& info) {
- AFL_VERIFY(Started);
- Tiering = info;
- if (Tiering) {
- TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetTtlColumn());
- } else {
- TieringColumnId = {};
- }
- return true;
-}
-
-bool TGranuleActualizationIndex::RefreshScheme() {
- AFL_VERIFY(Started);
- if (!ActualCriticalScheme) {
- if (!VersionedIndex.GetLastCriticalSchema()) {
- return false;
- }
- } else if (ActualCriticalScheme->GetVersion() == VersionedIndex.GetLastCriticalSchemaDef(ActualCriticalScheme)->GetVersion()) {
- return false;
- }
- ActualCriticalScheme = VersionedIndex.GetLastCriticalSchema();
- return true;
+void TGranuleActualizationIndex::RefreshScheme(const TAddExternalContext& context) {
+ AFL_VERIFY(SchemeActualizer);
+ SchemeActualizer->Refresh(context);
}
TGranuleActualizationIndex::TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex)
: PathId(pathId)
, VersionedIndex(versionedIndex)
- , StartInstant(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now())
{
Y_UNUSED(PathId);
}
+void TGranuleActualizationIndex::Start() {
+ AFL_VERIFY(Actualizers.empty());
+ TieringActualizer = std::make_shared<TTieringActualizer>(PathId, VersionedIndex);
+ SchemeActualizer = std::make_shared<TSchemeActualizer>(PathId, VersionedIndex);
+ Actualizers.emplace_back(TieringActualizer);
+ Actualizers.emplace_back(SchemeActualizer);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h
index fbf3b0c6320..fb740f8b163 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h
@@ -1,212 +1,36 @@
#pragma once
-#include "counters.h"
-
-#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
-#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h>
-#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
-
-#include <ydb/core/base/appdata.h>
-
-#include <ydb/library/accessor/accessor.h>
-
-#include <util/datetime/base.h>
-#include <util/generic/hash.h>
-#include <util/generic/hash_set.h>
-#include <map>
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h>
namespace NKikimr::NOlap {
class TVersionedIndex;
-class TPortionInfo;
+class TTiering;
}
namespace NKikimr::NOlap::NActualizer {
-
-class TTieringProcessContext;
+class TTieringActualizer;
+class TSchemeActualizer;
class TGranuleActualizationIndex {
private:
TCounters Counters;
+ std::vector<std::shared_ptr<IActualizer>> Actualizers;
- class TPortionIndexInfo {
- public:
- class TEviction {
- private:
- TRWAddress RWAddress;
- YDB_READONLY_DEF(TDuration, WaitDuration);
- public:
- const TRWAddress& GetRWAddress() const {
- return RWAddress;
- }
-
- TEviction(TRWAddress&& rwAddress, const TDuration waitDuration)
- : RWAddress(std::move(rwAddress))
- , WaitDuration(waitDuration)
- {
-
- }
- };
-
- class TScheme {
- private:
- TRWAddress RWAddress;
- public:
- const TRWAddress& GetRWAddress() const {
- return RWAddress;
- }
-
- TScheme(TRWAddress&& rwAddress)
- : RWAddress(std::move(rwAddress))
- {
-
- }
- };
- private:
- YDB_READONLY_DEF(std::optional<TEviction>, Eviction);
- YDB_READONLY_DEF(std::optional<TScheme>, Scheme);
- public:
- TPortionIndexInfo() = default;
-
- bool IsEmpty() const {
- return !Eviction && !Scheme;
- }
-
- void SetEviction(TRWAddress&& address, const TDuration waitDuration) {
- AFL_VERIFY(!Eviction);
- Eviction = TEviction(std::move(address), waitDuration);
- }
-
- void SetScheme(TRWAddress&& address) {
- AFL_VERIFY(!Scheme);
- Scheme = TScheme(std::move(address));
- }
-
- void AddCounters(TCounters& counters, const std::shared_ptr<TPortionInfo>& info) const {
- counters.PortionsToSyncSchema->AddPortion(info);
- }
- void RemoveCounters(TCounters& counters, const std::shared_ptr<TPortionInfo>& info) const {
- counters.PortionsToSyncSchema->RemovePortion(info);
- }
- };
-
- THashMap<TRWAddress, std::map<TDuration, THashSet<ui64>>> PortionIdByWaitDuration;
- THashMap<ui64, TPortionIndexInfo> PortionsInfo;
- THashMap<TRWAddress, THashSet<ui64>> PortionsToActualizeScheme;
+ std::shared_ptr<TTieringActualizer> TieringActualizer;
+ std::shared_ptr<TSchemeActualizer> SchemeActualizer;
const ui64 PathId;
const TVersionedIndex& VersionedIndex;
-
- ISnapshotSchema::TPtr ActualCriticalScheme;
- std::optional<TTiering> Tiering;
- std::optional<ui32> TieringColumnId;
- TInstant StartInstant = TInstant::Zero();
- bool Started = false;
public:
- void Start() {
- AFL_VERIFY(!Started);
- Started = true;
- }
-
- class TActualizationInfo {
- public:
- class TEvictionInfo {
- private:
- TRWAddress Address;
- YDB_ACCESSOR_DEF(TString, TargetTierName);
- i64 WaitDurationValue;
- public:
- TString DebugString() const {
- return TStringBuilder() << "{address=" << Address.DebugString() << ";target_tier=" << TargetTierName << ";wait_duration=" << TDuration::FromValue(WaitDurationValue) << "}";
- }
-
- const TRWAddress& GetAddress() const {
- return Address;
- }
-
- TEvictionInfo(TRWAddress&& address, const TString& targetTierName, const i64 waitDurationValue)
- : Address(std::move(address))
- , TargetTierName(targetTierName)
- , WaitDurationValue(waitDurationValue)
- {
-
- }
-
- TDuration GetWaitDuration() const {
- if (WaitDurationValue >= 0) {
- return TDuration::FromValue(WaitDurationValue);
- } else {
- return TDuration::Zero();
- }
- }
-
- TDuration GetLateness() const {
- if (WaitDurationValue >= 0) {
- return TDuration::Zero();
- } else {
- return TDuration::FromValue(-WaitDurationValue);
- }
- }
- };
-
- class TSchemeInfo {
- private:
- TRWAddress Address;
- YDB_ACCESSOR_DEF(std::shared_ptr<ISnapshotSchema>, TargetScheme);
- public:
- TString DebugString() const {
- return TStringBuilder() << "{address=" << Address.DebugString() << ";target_scheme=" << TargetScheme->DebugString() << "}";
- }
-
- const TRWAddress& GetAddress() const {
- return Address;
- }
-
- TSchemeInfo(TRWAddress&& address, const std::shared_ptr<ISnapshotSchema>& targetScheme)
- : Address(std::move(address))
- , TargetScheme(targetScheme) {
-
- }
- };
- private:
- YDB_READONLY_DEF(std::optional<TEvictionInfo>, Eviction);
- YDB_READONLY_DEF(std::optional<TSchemeInfo>, Scheme);
- public:
- TActualizationInfo() = default;
-
- TString DebugString() const {
- TStringBuilder sb;
- if (Eviction) {
- sb << "EVICTION=" << Eviction->DebugString() << ";";
- }
- if (Scheme) {
- sb << "SCHEME=" << Scheme->DebugString() << ";";
- }
- return sb;
- }
-
- void SetEviction(TRWAddress&& address, const TString& targetTierName, const i64 waitDurationValue) {
- AFL_VERIFY(!Eviction);
- Eviction = TEvictionInfo(std::move(address), targetTierName, waitDurationValue);
- }
-
- void SetScheme(TRWAddress&& address, const std::shared_ptr<ISnapshotSchema>& targetScheme) {
- AFL_VERIFY(!Scheme);
- Scheme = TSchemeInfo(std::move(address), targetScheme);
- }
- };
-
+ void Start();
TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex);
- void BuildActualizationTasks(TTieringProcessContext& context, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portionInfos) const;
-
- [[nodiscard]] bool RefreshTiering(const std::optional<TTiering>& info);
- [[nodiscard]] bool RefreshScheme();
-
- void Rebuild(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions);
-
- TActualizationInfo BuildActualizationInfo(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) const;
+ void BuildActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const;
- void AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TInstant now);
+ void RefreshTiering(const std::optional<TTiering>& info, const TAddExternalContext& context);
+ void RefreshScheme(const TAddExternalContext& context);
+ void AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TAddExternalContext& context);
void RemovePortion(const std::shared_ptr<TPortionInfo>& portion);
};
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make
index ff8f03727d0..8308be890dc 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make
@@ -2,7 +2,6 @@ LIBRARY()
SRCS(
index.cpp
- counters.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp
new file mode 100644
index 00000000000..71a6aa6d49e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp
@@ -0,0 +1,81 @@
+#include "scheme.h"
+#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h>
+#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
+
+namespace NKikimr::NOlap::NActualizer {
+
+std::optional<NKikimr::NOlap::NActualizer::TSchemeActualizer::TFullActualizationInfo> TSchemeActualizer::BuildActualizationInfo(const TPortionInfo& portion) const {
+ AFL_VERIFY(TargetSchema);
+ const TString& currentTierName = portion.GetTierNameDef(IStoragesManager::DefaultStorageId);
+ auto portionSchema = VersionedIndex.GetSchema(portion.GetMinSnapshot());
+ if (portionSchema->GetVersion() < TargetSchema->GetVersion()) {
+ auto storagesWrite = TargetSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
+ auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
+ TRWAddress address(std::move(storagesRead), std::move(storagesWrite));
+ return TFullActualizationInfo(std::move(address), TargetSchema);
+ }
+ return {};
+}
+
+void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExternalContext& /*context*/) {
+ if (!TargetSchema) {
+ return;
+ }
+ auto actualizationInfo = BuildActualizationInfo(info);
+ if (!actualizationInfo) {
+ return;
+ }
+ PortionsToActualizeScheme[actualizationInfo->GetAddress()].emplace(info.GetPortionId());
+ PortionsInfo.emplace(info.GetPortionId(), actualizationInfo->ExtractFindId());
+}
+
+void TSchemeActualizer::DoRemovePortion(const TPortionInfo& info) {
+ auto it = PortionsInfo.find(info.GetPortionId());
+ if (it == PortionsInfo.end()) {
+ return;
+ }
+
+ auto itAddress = PortionsToActualizeScheme.find(it->second.GetRWAddress());
+ AFL_VERIFY(itAddress != PortionsToActualizeScheme.end());
+ AFL_VERIFY(itAddress->second.erase(info.GetPortionId()));
+ if (itAddress->second.empty()) {
+ PortionsToActualizeScheme.erase(itAddress);
+ }
+}
+
+void TSchemeActualizer::DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) const {
+ for (auto&& [address, portions] : PortionsToActualizeScheme) {
+ if (!tasksContext.IsRWAddressAvailable(address)) {
+ break;
+ }
+ for (auto&& portionId : portions) {
+ auto portion = externalContext.GetPortionVerified(portionId);
+ auto info = BuildActualizationInfo(*portion);
+ AFL_VERIFY(info);
+ auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot());
+ TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
+
+ if (!tasksContext.AddPortion(*portion, std::move(features), {})) {
+ break;
+ }
+ }
+ }
+}
+
+void TSchemeActualizer::Refresh(const TAddExternalContext& externalContext) {
+ TargetSchema = VersionedIndex.GetLastCriticalSchema();
+ if (!TargetSchema) {
+ AFL_VERIFY(PortionsInfo.empty());
+ } else {
+ PortionsInfo.clear();
+ PortionsToActualizeScheme.clear();
+ for (auto&& i : externalContext.GetPortions()) {
+ AddPortion(i.second, externalContext);
+ }
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h
new file mode 100644
index 00000000000..55e96745393
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h
@@ -0,0 +1,71 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
+
+namespace NKikimr::NOlap::NActualizer {
+
+class TSchemeActualizer: public IActualizer {
+private:
+ THashMap<TRWAddress, THashSet<ui64>> PortionsToActualizeScheme;
+ std::shared_ptr<ISnapshotSchema> TargetSchema;
+ const ui64 PathId;
+ const TVersionedIndex& VersionedIndex;
+
+ class TFindActualizationInfo {
+ private:
+ TRWAddress RWAddress;
+ public:
+ const TRWAddress& GetRWAddress() const {
+ return RWAddress;
+ }
+
+ TFindActualizationInfo(TRWAddress&& rwAddress)
+ : RWAddress(std::move(rwAddress)) {
+
+ }
+ };
+
+ THashMap<ui64, TFindActualizationInfo> PortionsInfo;
+
+ class TFullActualizationInfo {
+ private:
+ TRWAddress Address;
+ YDB_ACCESSOR_DEF(std::shared_ptr<ISnapshotSchema>, TargetScheme);
+ public:
+ TFindActualizationInfo ExtractFindId() {
+ return TFindActualizationInfo(std::move(Address));
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "{address=" << Address.DebugString() << ";target_scheme=" << TargetScheme->DebugString() << "}";
+ }
+
+ const TRWAddress& GetAddress() const {
+ return Address;
+ }
+
+ TFullActualizationInfo(TRWAddress&& address, const std::shared_ptr<ISnapshotSchema>& targetScheme)
+ : Address(std::move(address))
+ , TargetScheme(targetScheme) {
+
+ }
+ };
+
+ std::optional<TFullActualizationInfo> BuildActualizationInfo(const TPortionInfo& portion) const;
+
+protected:
+ virtual void DoAddPortion(const TPortionInfo& info, const TAddExternalContext& context) override;
+ virtual void DoRemovePortion(const TPortionInfo& info) override;
+ virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const override;
+public:
+ void Refresh(const TAddExternalContext& externalContext);
+
+ TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
+ : PathId(pathId)
+ , VersionedIndex(versionedIndex) {
+ Y_UNUSED(PathId);
+ }
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make
new file mode 100644
index 00000000000..f987d972103
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ scheme.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/scheme/versions
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp
new file mode 100644
index 00000000000..4db06e8d421
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp
@@ -0,0 +1,159 @@
+#include "tiering.h"
+#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h>
+#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/data_locks/manager/manager.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+
+namespace NKikimr::NOlap::NActualizer {
+
+std::shared_ptr<NKikimr::NOlap::ISnapshotSchema> TTieringActualizer::GetTargetSchema(const std::shared_ptr<ISnapshotSchema>& portionSchema) const {
+ if (!TargetCriticalSchema) {
+ return portionSchema;
+ }
+ if (portionSchema->GetVersion() < TargetCriticalSchema->GetVersion()) {
+ return TargetCriticalSchema;
+ }
+ return portionSchema;
+}
+
+std::optional<TTieringActualizer::TFullActualizationInfo> TTieringActualizer::BuildActualizationInfo(const TPortionInfo& portion, const TInstant now) const {
+ std::shared_ptr<ISnapshotSchema> portionSchema = VersionedIndex.GetSchema(portion.GetMinSnapshot());
+ std::shared_ptr<ISnapshotSchema> targetSchema = GetTargetSchema(portionSchema);
+ const TString& currentTierName = portion.GetTierNameDef(IStoragesManager::DefaultStorageId);
+
+ if (Tiering) {
+ AFL_VERIFY(TieringColumnId);
+ auto statOperator = portionSchema->GetIndexInfo().GetStatistics(NStatistics::TIdentifier(NStatistics::EType::Max, {*TieringColumnId}));
+ std::shared_ptr<arrow::Scalar> max;
+ if (!statOperator) {
+ max = portion.MaxValue(*TieringColumnId);
+ if (!max) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
+ return {};
+ }
+ } else {
+ NYDBTest::TControllers::GetColumnShardController()->OnStatisticsUsage(statOperator);
+ max = statOperator.GetScalarVerified(portion.GetMeta().GetStatisticsStorage());
+ }
+ auto tieringInfo = Tiering->GetTierToMove(max, now);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString());
+ std::optional<i64> d;
+ std::set<TString> storagesWrite;
+ TString targetTierName;
+ if (portion.GetTierNameDef(IStoragesManager::DefaultStorageId) != tieringInfo.GetCurrentTierName()) {
+ d = -1 * tieringInfo.GetCurrentTierLag().GetValue();
+ targetTierName = tieringInfo.GetCurrentTierName();
+ } else if (tieringInfo.GetNextTierName()) {
+ d = tieringInfo.GetNextTierWaitingVerified().GetValue();
+ targetTierName = tieringInfo.GetNextTierNameVerified();
+ }
+ if (d) {
+ // if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) {
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
+ // AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId);
+ // }
+ auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(targetTierName);
+ auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
+ return TFullActualizationInfo(TRWAddress(std::move(storagesRead), std::move(storagesWrite)), targetTierName, *d, targetSchema);
+ }
+ } else if (currentTierName != IStoragesManager::DefaultStorageId) {
+ // if (currentTierName == "deploy_logs_s3") {
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("pathId", PathId);
+ // AFL_VERIFY(false)("pathId", PathId);
+ // }
+ auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(IStoragesManager::DefaultStorageId);
+ auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName);
+ TRWAddress address(std::move(storagesRead), std::move(storagesWrite));
+ return TFullActualizationInfo(std::move(address), IStoragesManager::DefaultStorageId, 0, targetSchema);
+ }
+ return {};
+}
+
+void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) {
+ auto info = BuildActualizationInfo(portion, addContext.GetNow());
+ if (!info) {
+ return;
+ }
+ PortionIdByWaitDuration[info->GetAddress()][info->GetWaitDuration() + (addContext.GetNow() - StartInstant)].emplace(portion.GetPortionId());
+ auto address = info->GetAddress();
+ TFindActualizationInfo findId(std::move(address), info->GetWaitDuration() + (addContext.GetNow() - StartInstant));
+ PortionsInfo.emplace(portion.GetPortionId(), std::move(findId));
+}
+
+void TTieringActualizer::DoRemovePortion(const TPortionInfo& info) {
+ auto it = PortionsInfo.find(info.GetPortionId());
+ if (it == PortionsInfo.end()) {
+ return;
+ }
+ auto itAddress = PortionIdByWaitDuration.find(it->second.GetRWAddress());
+ AFL_VERIFY(itAddress != PortionIdByWaitDuration.end());
+ auto itDuration = itAddress->second.find(it->second.GetWaitDuration());
+ AFL_VERIFY(itDuration != itAddress->second.end());
+ AFL_VERIFY(itDuration->second.erase(info.GetPortionId()));
+ if (itDuration->second.empty()) {
+ itAddress->second.erase(itDuration);
+ }
+ if (itAddress->second.empty()) {
+ PortionIdByWaitDuration.erase(itAddress);
+ }
+}
+
+void TTieringActualizer::DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) const {
+ for (auto&& [address, addressPortions] : PortionIdByWaitDuration) {
+ if (!tasksContext.IsRWAddressAvailable(address)) {
+ break;
+ }
+ for (auto&& [duration, portions] : addressPortions) {
+ if (duration - (tasksContext.Now - StartInstant) > TDuration::Zero()) {
+ break;
+ }
+ bool limitEnriched = false;
+ for (auto&& p : portions) {
+ auto portion = externalContext.GetPortionVerified(p);
+ auto info = BuildActualizationInfo(*portion, tasksContext.Now);
+ AFL_VERIFY(info);
+ auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot());
+ TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
+ features.SetTargetTierName(info->GetTargetTierName());
+
+ const TInstant maxChangePortionInstant = portion->RecordSnapshotMax().GetPlanInstant();
+ if (info->GetTargetTierName() != IStoragesManager::DefaultStorageId && portion->GetTierNameDef(IStoragesManager::DefaultStorageId) == IStoragesManager::DefaultStorageId) {
+ if (tasksContext.Now - maxChangePortionInstant < NYDBTest::TControllers::GetColumnShardController()->GetLagForCompactionBeforeTierings(TDuration::Minutes(60))) {
+ tasksContext.GetCounters().OnActualizationSkipTooFreshPortion(tasksContext.Now - maxChangePortionInstant);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", tasksContext.Now - maxChangePortionInstant);
+ continue;
+ }
+ }
+
+ if (!tasksContext.AddPortion(*portion, std::move(features), info->GetLateness())) {
+ limitEnriched = true;
+ break;
+ }
+ }
+ if (limitEnriched) {
+ break;
+ }
+ }
+ }
+}
+
+void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) {
+ StartInstant = externalContext.GetNow();
+ Tiering = info;
+ if (Tiering) {
+ TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetTtlColumn());
+ } else {
+ TieringColumnId = {};
+ }
+ TargetCriticalSchema = VersionedIndex.GetLastCriticalSchema();
+ PortionsInfo.clear();
+ PortionIdByWaitDuration.clear();
+
+ for (auto&& i : externalContext.GetPortions()) {
+ AddPortion(i.second, externalContext);
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h
new file mode 100644
index 00000000000..26f521ce05b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h
@@ -0,0 +1,102 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h>
+
+namespace NKikimr::NOlap {
+class TTiering;
+}
+
+namespace NKikimr::NOlap::NActualizer {
+
+class TTieringActualizer: public IActualizer {
+private:
+ class TFullActualizationInfo {
+ private:
+ TRWAddress Address;
+ YDB_ACCESSOR_DEF(TString, TargetTierName);
+ YDB_ACCESSOR_DEF(ISnapshotSchema::TPtr, TargetScheme);
+ i64 WaitDurationValue;
+ public:
+ TString DebugString() const {
+ return TStringBuilder() << "{address=" << Address.DebugString() << ";target_tier=" << TargetTierName << ";wait_duration=" << TDuration::FromValue(WaitDurationValue) << "}";
+ }
+
+ const TRWAddress& GetAddress() const {
+ return Address;
+ }
+
+ TFullActualizationInfo(TRWAddress&& address, const TString& targetTierName, const i64 waitDurationValue, const ISnapshotSchema::TPtr& targetScheme)
+ : Address(std::move(address))
+ , TargetTierName(targetTierName)
+ , TargetScheme(targetScheme)
+ , WaitDurationValue(waitDurationValue)
+ {
+
+ }
+
+ TDuration GetWaitDuration() const {
+ if (WaitDurationValue >= 0) {
+ return TDuration::FromValue(WaitDurationValue);
+ } else {
+ return TDuration::Zero();
+ }
+ }
+
+ TDuration GetLateness() const {
+ if (WaitDurationValue >= 0) {
+ return TDuration::Zero();
+ } else {
+ return TDuration::FromValue(-WaitDurationValue);
+ }
+ }
+ };
+
+ class TFindActualizationInfo {
+ private:
+ TRWAddress RWAddress;
+ YDB_READONLY_DEF(TDuration, WaitDuration);
+ public:
+ const TRWAddress& GetRWAddress() const {
+ return RWAddress;
+ }
+
+ TFindActualizationInfo(TRWAddress&& rwAddress, const TDuration waitDuration)
+ : RWAddress(std::move(rwAddress))
+ , WaitDuration(waitDuration) {
+
+ }
+ };
+
+ std::optional<TTiering> Tiering;
+ std::optional<ui32> TieringColumnId;
+
+ std::shared_ptr<ISnapshotSchema> TargetCriticalSchema;
+ const ui64 PathId;
+ const TVersionedIndex& VersionedIndex;
+
+ TInstant StartInstant = TInstant::Zero();
+ THashMap<TRWAddress, std::map<TDuration, THashSet<ui64>>> PortionIdByWaitDuration;
+ THashMap<ui64, TFindActualizationInfo> PortionsInfo;
+
+ std::shared_ptr<ISnapshotSchema> GetTargetSchema(const std::shared_ptr<ISnapshotSchema>& portionSchema) const;
+
+ std::optional<TFullActualizationInfo> BuildActualizationInfo(const TPortionInfo& portion, const TInstant now) const;
+
+ virtual void DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) override;
+ virtual void DoRemovePortion(const TPortionInfo& info) override;
+ virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const override;
+
+public:
+ void Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext);
+
+ TTieringActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex)
+ : PathId(pathId)
+ , VersionedIndex(versionedIndex)
+ {
+ Y_UNUSED(PathId);
+ }
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make
new file mode 100644
index 00000000000..7ce5affd1ae
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ tiering.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/scheme/versions
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make
index 7b46cab8792..0689e11c04e 100644
--- a/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make
@@ -3,6 +3,10 @@ LIBRARY()
PEERDIR(
ydb/core/tx/columnshard/engines/storage/actualizer/index
ydb/core/tx/columnshard/engines/storage/actualizer/common
+ ydb/core/tx/columnshard/engines/storage/actualizer/abstract
+ ydb/core/tx/columnshard/engines/storage/actualizer/scheme
+ ydb/core/tx/columnshard/engines/storage/actualizer/tiering
+ ydb/core/tx/columnshard/engines/storage/actualizer/counters
)
END()
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 706f5d1abca..e8ee5ee4b24 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -70,7 +70,8 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port
} else {
OptimizerPlanner->StartModificationGuard().AddPortion(portionAfter);
}
- ActualizationIndex->AddPortion(portionAfter, HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now());
+ NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions);
+ ActualizationIndex->AddPortion(portionAfter, context);
}
}
if (!!AdditiveSummaryCache) {
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 6b353ed0847..838dd7a6ad3 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -182,9 +182,8 @@ private:
YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero());
public:
void RefreshTiering(const std::optional<TTiering>& tiering) {
- if (ActualizationIndex->RefreshTiering(tiering)) {
- ActualizationIndex->Rebuild(Portions);
- }
+ NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions);
+ ActualizationIndex->RefreshTiering(tiering, context);
}
void StartActualizationIndex() {
@@ -204,7 +203,8 @@ public:
}
void BuildActualizationTasks(NActualizer::TTieringProcessContext& context) const {
- ActualizationIndex->BuildActualizationTasks(context, Portions);
+ NActualizer::TExternalTasksContext extTasks(Portions);
+ ActualizationIndex->BuildActualizationTasks(context, extTasks);
}
std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> self, const std::shared_ptr<NDataLocks::TManager>& locksManager) const {
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h
index 838b8120c7b..9ccaf1633b4 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.h
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.h
@@ -7,7 +7,7 @@
#include <ydb/core/tx/columnshard/counters/indexation.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
-#include <ydb/core/tx/columnshard/engines/storage/granule.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h
index ec911218475..4006c9c1fc1 100644
--- a/ydb/core/tx/columnshard/splitter/rb_splitter.h
+++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h
@@ -4,7 +4,6 @@
#include <ydb/core/tx/columnshard/counters/indexation.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
-#include <ydb/core/tx/columnshard/engines/storage/granule.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make
index dd076933faa..89f675d5f3f 100644
--- a/ydb/core/tx/columnshard/splitter/ut/ya.make
+++ b/ydb/core/tx/columnshard/splitter/ut/ya.make
@@ -9,6 +9,7 @@ PEERDIR(
ydb/core/tx/columnshard/counters
ydb/core/tx/columnshard/engines/portions
ydb/core/tx/columnshard/common
+ ydb/core/tx/columnshard/blobs_action
ydb/core/tx/columnshard/data_sharing
ydb/core/kqp/common
ydb/library/yql/parser/pg_wrapper
diff --git a/ydb/core/tx/columnshard/splitter/ya.make b/ydb/core/tx/columnshard/splitter/ya.make
index 22582303a82..7f86cada366 100644
--- a/ydb/core/tx/columnshard/splitter/ya.make
+++ b/ydb/core/tx/columnshard/splitter/ya.make
@@ -15,8 +15,8 @@ SRCS(
PEERDIR(
contrib/libs/apache/arrow
- ydb/core/tx/columnshard/engines/storage
ydb/core/tx/columnshard/splitter/abstract
+ ydb/core/tx/columnshard/engines/scheme
)
END()