diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-03-13 07:57:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-13 07:57:13 +0300 |
commit | 8500ceb8f4f8742530f7ca71c99e426ba3617e28 (patch) | |
tree | 90ddba80d62b2882e1e8d27c9ba72ed770ae7399 | |
parent | a9fc0430cb93c2c5488e752cc147360be35b37d8 (diff) | |
download | ydb-8500ceb8f4f8742530f7ca71c99e426ba3617e28.tar.gz |
split tiering and scheme actualizers (#2679)
28 files changed, 630 insertions, 445 deletions
diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp index 09da4e8826d..e089ba11c1e 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp @@ -1,6 +1,7 @@ #include "context.h" #include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> +#include <ydb/core/tx/columnshard/data_locks/manager/manager.h> namespace NKikimr::NOlap::NActualizer { @@ -16,14 +17,12 @@ TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, cons } -bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait, const TInstant now) { - const TInstant maxChangePortionInstant = info.RecordSnapshotMax().GetPlanInstant(); - if (features.GetTargetTierName() != IStoragesManager::DefaultStorageId && info.GetTierNameDef(IStoragesManager::DefaultStorageId) == IStoragesManager::DefaultStorageId) { - if (now - maxChangePortionInstant < NYDBTest::TControllers::GetColumnShardController()->GetLagForCompactionBeforeTierings(TDuration::Minutes(60))) { - Counters.OnActualizationSkipTooFreshPortion(now - maxChangePortionInstant); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", now - maxChangePortionInstant); - return true; - } +bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait) { + if (!UsedPortions.emplace(info.GetAddress()).second) { + return true; + } + if (DataLocksManager->IsLocked(info)) { + return true; } const auto buildNewTask = [&]() { diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h index da972b86e18..0e5a5e326d3 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h @@ -26,6 +26,7 @@ public: class TTieringProcessContext { private: + THashSet<TPortionAddress> UsedPortions; const ui64 MemoryUsageLimit; TSaverContext SaverContext; THashMap<TRWAddress, std::vector<TTaskConstructor>> Tasks; @@ -43,7 +44,7 @@ public: return Tasks; } - bool AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait, const TInstant now); + bool AddPortion(const TPortionInfo& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait); bool IsRWAddressAvailable(const TRWAddress& address) const { auto it = Tasks.find(address); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 5ed8dc6c84b..fc9052c3823 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -303,11 +303,7 @@ public: } bool CheckForCleanup(const TSnapshot& snapshot) const { - if (!HasRemoveSnapshot()) { - return false; - } else { - return GetRemoveSnapshotVerified() <= snapshot; - } + return IsRemovedFor(snapshot); } bool CheckForCleanup() const { diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp new file mode 100644 index 00000000000..d193e92e3e1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.cpp @@ -0,0 +1,5 @@ +#include "abstract.h" + +namespace NKikimr::NOlap::NActualizer { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h new file mode 100644 index 00000000000..077ce3a2754 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h @@ -0,0 +1,34 @@ +#pragma once +#include "context.h" + +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> + +namespace NKikimr::NOlap::NActualizer { + +class IActualizer { +protected: + virtual void DoAddPortion(const TPortionInfo& info, const TAddExternalContext& context) = 0; + virtual void DoRemovePortion(const TPortionInfo& info) = 0; + virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const = 0; +public: + virtual ~IActualizer() = default; + void BuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) { + return DoBuildTasks(tasksContext, externalContext, internalContext); + } + void AddPortion(const std::shared_ptr<TPortionInfo>& info, const TAddExternalContext& context) { + AFL_VERIFY(info); + if (info->HasRemoveSnapshot()) { + return; + } + return DoAddPortion(*info, context); + } + void RemovePortion(const std::shared_ptr<TPortionInfo>& info) { + AFL_VERIFY(info); + if (info->HasRemoveSnapshot()) { + return; + } + return DoRemovePortion(*info); + } +}; + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp new file mode 100644 index 00000000000..6ef73de8ceb --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.cpp @@ -0,0 +1,5 @@ +#include "context.h" + +namespace NKikimr::NOlap::NActualizer { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h new file mode 100644 index 00000000000..0c2ad3b78b3 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/context.h @@ -0,0 +1,58 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> + +#include <ydb/library/accessor/accessor.h> + +#include <util/generic/hash.h> + +namespace NKikimr::NOlap { +class TPortionInfo; +} + +namespace NKikimr::NOlap::NActualizer { + +class TTieringProcessContext; + +class TAddExternalContext { +private: + YDB_READONLY_DEF(TInstant, Now); + const THashMap<ui64, std::shared_ptr<TPortionInfo>>& Portions; +public: + TAddExternalContext(const TInstant now, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions) + : Now(now) + , Portions(portions) + { + + } + + const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const { + return Portions; + } +}; + +class TExternalTasksContext { +private: + const THashMap<ui64, std::shared_ptr<TPortionInfo>>& Portions; +public: + const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const { + return Portions; + } + + const std::shared_ptr<TPortionInfo>& GetPortionVerified(const ui64 portionId) const { + auto it = Portions.find(portionId); + AFL_VERIFY(it != Portions.end()); + return it->second; + } + + TExternalTasksContext(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions) + : Portions(portions) + { + + } +}; + +class TInternalTasksContext { +public: +}; + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make new file mode 100644 index 00000000000..90979469658 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/abstract/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + abstract.cpp + context.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme/versions +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp new file mode 100644 index 00000000000..f6e4cca6431 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.cpp @@ -0,0 +1,5 @@ +#include "counters.h" + +namespace NKikimr::NOlap::NActualizer { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.h b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h index 520befbb965..c796b62f2c0 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.h +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h @@ -1,9 +1,6 @@ #pragma once -#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> -#include <ydb/core/formats/arrow/replace_key.h> -#include <ydb/library/accessor/accessor.h> -#include <ydb/core/tx/columnshard/splitter/settings.h> -#include <ydb/core/tx/columnshard/counters/engine_logs.h> +#include <ydb/core/tx/columnshard/counters/common/owner.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> namespace NKikimr::NOlap::NActualizer { diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make new file mode 100644 index 00000000000..d73b370747b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/counters/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + counters.cpp +) + +PEERDIR( + ydb/library/actors/core + ydb/core/tx/columnshard/engines/portions + ydb/core/tx/columnshard/counters/common +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp deleted file mode 100644 index 146a8700def..00000000000 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/counters.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "counters.h" - -namespace NKikimr::NOlap::NStorageOptimizer::NBuckets { - -} diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp index 739c1ebae2e..e4bd166e9c6 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.cpp @@ -1,249 +1,51 @@ #include "index.h" -#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h> -#include <ydb/core/tx/columnshard/data_locks/manager/manager.h> -#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h> +#include <ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h> namespace NKikimr::NOlap::NActualizer { -void TGranuleActualizationIndex::BuildActualizationTasks(NActualizer::TTieringProcessContext& context, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portionInfos) const { - THashSet<ui64> usedPortions; - for (auto&& [address, addressPortions] : PortionIdByWaitDuration) { - if (!context.IsRWAddressAvailable(address)) { - break; - } - for (auto&& [duration, portions] : addressPortions) { - if (duration - (context.Now - StartInstant) > TDuration::Zero()) { - break; - } - bool limitEnriched = false; - for (auto&& p : portions) { - auto itPortionInfo = portionInfos.find(p); - AFL_VERIFY(itPortionInfo != portionInfos.end()); - if (context.DataLocksManager->IsLocked(*itPortionInfo->second)) { - continue; - } - auto portion = itPortionInfo->second; - - auto info = BuildActualizationInfo(portion, context.Now); - auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot()); - TPortionEvictionFeatures features(portionScheme, info.GetScheme() ? info.GetScheme()->GetTargetScheme() : portionScheme, portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); - - std::optional<TDuration> lateness; - if (info.GetEviction()) { - features.SetTargetTierName(info.GetEviction()->GetTargetTierName()); - lateness = info.GetEviction()->GetLateness(); - } - - if (!context.AddPortion(*portion, std::move(features), lateness, context.Now)) { - limitEnriched = true; - break; - } - } - if (limitEnriched) { - break; - } - } - } - for (auto&& [address, portions] : PortionsToActualizeScheme) { - if (!context.IsRWAddressAvailable(address)) { - break; - } - for (auto&& portionId : portions) { - if (!usedPortions.emplace(portionId).second) { - continue; - } - auto itPortionInfo = portionInfos.find(portionId); - AFL_VERIFY(itPortionInfo != portionInfos.end()); - auto portion = itPortionInfo->second; - if (context.DataLocksManager->IsLocked(*portion)) { - continue; - } - - auto info = BuildActualizationInfo(portion, context.Now); - auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot()); - TPortionEvictionFeatures features(portionScheme, info.GetScheme() ? info.GetScheme()->GetTargetScheme() : portionScheme, portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); - - if (!context.AddPortion(*portion, std::move(features), {}, context.Now)) { - break; - } - } +void TGranuleActualizationIndex::BuildActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const { + TInternalTasksContext internalContext; + for (auto&& i : Actualizers) { + i->BuildTasks(tasksContext, externalContext, internalContext); } } -TGranuleActualizationIndex::TActualizationInfo TGranuleActualizationIndex::BuildActualizationInfo(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) const { - const auto portionSchema = VersionedIndex.GetSchema(portion->GetMinSnapshot()); - auto targetSchema = ActualCriticalScheme ? ActualCriticalScheme : portionSchema; - const auto currentTierName = portion->GetTierNameDef(IStoragesManager::DefaultStorageId); - TActualizationInfo result; - if (Tiering) { - AFL_VERIFY(TieringColumnId); - auto statOperator = portionSchema->GetIndexInfo().GetStatistics(NStatistics::TIdentifier(NStatistics::EType::Max, {*TieringColumnId})); - std::shared_ptr<arrow::Scalar> max; - if (!statOperator) { - max = portion->MaxValue(*TieringColumnId); - if (!max) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); - return result; - } - } else { - NYDBTest::TControllers::GetColumnShardController()->OnStatisticsUsage(statOperator); - max = statOperator.GetScalarVerified(portion->GetMeta().GetStatisticsStorage()); - } - auto tieringInfo = Tiering->GetTierToMove(max, now); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString()); - std::optional<i64> d; - std::set<TString> storagesWrite; - TString targetTierName; - if (portion->GetTierNameDef(IStoragesManager::DefaultStorageId) != tieringInfo.GetCurrentTierName()) { - d = -1 * tieringInfo.GetCurrentTierLag().GetValue(); - targetTierName = tieringInfo.GetCurrentTierName(); - } else if (tieringInfo.GetNextTierName()) { - d = tieringInfo.GetNextTierWaitingVerified().GetValue(); - targetTierName = tieringInfo.GetNextTierNameVerified(); - } - if (d) { -// if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) { -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId); -// AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId); -// } - auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(targetTierName); - auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); - TRWAddress address(std::move(storagesRead), std::move(storagesWrite)); - result.SetEviction(std::move(address), targetTierName, *d); - if (*d < (i64)TDuration::Minutes(1).Seconds()) { - return result; - } - } - } else if (currentTierName != IStoragesManager::DefaultStorageId) { -// if (currentTierName == "deploy_logs_s3") { -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("pathId", PathId); -// AFL_VERIFY(false)("pathId", PathId); -// } - auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(IStoragesManager::DefaultStorageId); - auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); - TRWAddress address(std::move(storagesRead), std::move(storagesWrite)); - result.SetEviction(std::move(address), IStoragesManager::DefaultStorageId, 0); - return result; - } - if (portionSchema->GetVersion() < targetSchema->GetVersion()) { - auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); - auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); - TRWAddress address(std::move(storagesRead), std::move(storagesWrite)); - result.SetScheme(std::move(address), targetSchema); - } - return result; -} - -void TGranuleActualizationIndex::AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) { - if (!Started) { - return; - } - if (portion->HasRemoveSnapshot()) { - return; - } - auto actualizationInfo = BuildActualizationInfo(portion, now); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("actualization_info", actualizationInfo.DebugString()); - TPortionIndexInfo result; - if (actualizationInfo.GetEviction()) { - auto& eviction = *actualizationInfo.GetEviction(); - PortionIdByWaitDuration[eviction.GetAddress()][eviction.GetWaitDuration() + (now - StartInstant)].emplace(portion->GetPortionId()); - auto address = eviction.GetAddress(); - result.SetEviction(std::move(address), eviction.GetWaitDuration() + (now - StartInstant)); - } - if (actualizationInfo.GetScheme()) { - auto& scheme = *actualizationInfo.GetScheme(); - PortionsToActualizeScheme[scheme.GetAddress()].emplace(portion->GetPortionId()); - auto address = scheme.GetAddress(); - result.SetScheme(std::move(address)); - } - result.AddCounters(Counters, portion); - if (!result.IsEmpty()) { - AFL_VERIFY(PortionsInfo.emplace(portion->GetPortionId(), std::move(result)).second); +void TGranuleActualizationIndex::AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TAddExternalContext& context) { + for (auto&& i : Actualizers) { + i->AddPortion(portion, context); } } void TGranuleActualizationIndex::RemovePortion(const std::shared_ptr<TPortionInfo>& portion) { - auto it = PortionsInfo.find(portion->GetPortionId()); - if (it == PortionsInfo.end()) { - return; - } - if (portion->HasRemoveSnapshot()) { - return; + for (auto&& i : Actualizers) { + i->RemovePortion(portion); } - if (it->second.GetEviction()) { - auto itAddress = PortionIdByWaitDuration.find(it->second.GetEviction()->GetRWAddress()); - AFL_VERIFY(itAddress != PortionIdByWaitDuration.end()); - auto itDuration = itAddress->second.find(it->second.GetEviction()->GetWaitDuration()); - AFL_VERIFY(itDuration != itAddress->second.end()); - AFL_VERIFY(itDuration->second.erase(portion->GetPortionId())); - if (itDuration->second.empty()) { - itAddress->second.erase(itDuration); - } - if (itAddress->second.empty()) { - PortionIdByWaitDuration.erase(itAddress); - } - } - - if (it->second.GetScheme()) { - auto itAddress = PortionsToActualizeScheme.find(it->second.GetScheme()->GetRWAddress()); - AFL_VERIFY(itAddress != PortionsToActualizeScheme.end()); - AFL_VERIFY(itAddress->second.erase(portion->GetPortionId())); - if (itAddress->second.empty()) { - PortionsToActualizeScheme.erase(itAddress); - } - } - it->second.RemoveCounters(Counters, portion); - PortionsInfo.erase(it); } -void TGranuleActualizationIndex::Rebuild(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions) { - if (!Started) { - return; - } - StartInstant = HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(); - PortionIdByWaitDuration.clear(); - PortionsToActualizeScheme.clear(); - PortionsInfo.clear(); - for (auto&& [_, portion] : portions) { - if (portion->HasRemoveSnapshot()) { - continue; - } - AddPortion(portion, StartInstant); - } +void TGranuleActualizationIndex::RefreshTiering(const std::optional<TTiering>& info, const TAddExternalContext& context) { + AFL_VERIFY(TieringActualizer); + TieringActualizer->Refresh(info, context); } -bool TGranuleActualizationIndex::RefreshTiering(const std::optional<TTiering>& info) { - AFL_VERIFY(Started); - Tiering = info; - if (Tiering) { - TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetTtlColumn()); - } else { - TieringColumnId = {}; - } - return true; -} - -bool TGranuleActualizationIndex::RefreshScheme() { - AFL_VERIFY(Started); - if (!ActualCriticalScheme) { - if (!VersionedIndex.GetLastCriticalSchema()) { - return false; - } - } else if (ActualCriticalScheme->GetVersion() == VersionedIndex.GetLastCriticalSchemaDef(ActualCriticalScheme)->GetVersion()) { - return false; - } - ActualCriticalScheme = VersionedIndex.GetLastCriticalSchema(); - return true; +void TGranuleActualizationIndex::RefreshScheme(const TAddExternalContext& context) { + AFL_VERIFY(SchemeActualizer); + SchemeActualizer->Refresh(context); } TGranuleActualizationIndex::TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex) : PathId(pathId) , VersionedIndex(versionedIndex) - , StartInstant(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now()) { Y_UNUSED(PathId); } +void TGranuleActualizationIndex::Start() { + AFL_VERIFY(Actualizers.empty()); + TieringActualizer = std::make_shared<TTieringActualizer>(PathId, VersionedIndex); + SchemeActualizer = std::make_shared<TSchemeActualizer>(PathId, VersionedIndex); + Actualizers.emplace_back(TieringActualizer); + Actualizers.emplace_back(SchemeActualizer); +} + } diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h index fbf3b0c6320..fb740f8b163 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/index.h @@ -1,212 +1,36 @@ #pragma once -#include "counters.h" - -#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h> -#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h> -#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h> - -#include <ydb/core/base/appdata.h> - -#include <ydb/library/accessor/accessor.h> - -#include <util/datetime/base.h> -#include <util/generic/hash.h> -#include <util/generic/hash_set.h> -#include <map> +#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/storage/actualizer/counters/counters.h> namespace NKikimr::NOlap { class TVersionedIndex; -class TPortionInfo; +class TTiering; } namespace NKikimr::NOlap::NActualizer { - -class TTieringProcessContext; +class TTieringActualizer; +class TSchemeActualizer; class TGranuleActualizationIndex { private: TCounters Counters; + std::vector<std::shared_ptr<IActualizer>> Actualizers; - class TPortionIndexInfo { - public: - class TEviction { - private: - TRWAddress RWAddress; - YDB_READONLY_DEF(TDuration, WaitDuration); - public: - const TRWAddress& GetRWAddress() const { - return RWAddress; - } - - TEviction(TRWAddress&& rwAddress, const TDuration waitDuration) - : RWAddress(std::move(rwAddress)) - , WaitDuration(waitDuration) - { - - } - }; - - class TScheme { - private: - TRWAddress RWAddress; - public: - const TRWAddress& GetRWAddress() const { - return RWAddress; - } - - TScheme(TRWAddress&& rwAddress) - : RWAddress(std::move(rwAddress)) - { - - } - }; - private: - YDB_READONLY_DEF(std::optional<TEviction>, Eviction); - YDB_READONLY_DEF(std::optional<TScheme>, Scheme); - public: - TPortionIndexInfo() = default; - - bool IsEmpty() const { - return !Eviction && !Scheme; - } - - void SetEviction(TRWAddress&& address, const TDuration waitDuration) { - AFL_VERIFY(!Eviction); - Eviction = TEviction(std::move(address), waitDuration); - } - - void SetScheme(TRWAddress&& address) { - AFL_VERIFY(!Scheme); - Scheme = TScheme(std::move(address)); - } - - void AddCounters(TCounters& counters, const std::shared_ptr<TPortionInfo>& info) const { - counters.PortionsToSyncSchema->AddPortion(info); - } - void RemoveCounters(TCounters& counters, const std::shared_ptr<TPortionInfo>& info) const { - counters.PortionsToSyncSchema->RemovePortion(info); - } - }; - - THashMap<TRWAddress, std::map<TDuration, THashSet<ui64>>> PortionIdByWaitDuration; - THashMap<ui64, TPortionIndexInfo> PortionsInfo; - THashMap<TRWAddress, THashSet<ui64>> PortionsToActualizeScheme; + std::shared_ptr<TTieringActualizer> TieringActualizer; + std::shared_ptr<TSchemeActualizer> SchemeActualizer; const ui64 PathId; const TVersionedIndex& VersionedIndex; - - ISnapshotSchema::TPtr ActualCriticalScheme; - std::optional<TTiering> Tiering; - std::optional<ui32> TieringColumnId; - TInstant StartInstant = TInstant::Zero(); - bool Started = false; public: - void Start() { - AFL_VERIFY(!Started); - Started = true; - } - - class TActualizationInfo { - public: - class TEvictionInfo { - private: - TRWAddress Address; - YDB_ACCESSOR_DEF(TString, TargetTierName); - i64 WaitDurationValue; - public: - TString DebugString() const { - return TStringBuilder() << "{address=" << Address.DebugString() << ";target_tier=" << TargetTierName << ";wait_duration=" << TDuration::FromValue(WaitDurationValue) << "}"; - } - - const TRWAddress& GetAddress() const { - return Address; - } - - TEvictionInfo(TRWAddress&& address, const TString& targetTierName, const i64 waitDurationValue) - : Address(std::move(address)) - , TargetTierName(targetTierName) - , WaitDurationValue(waitDurationValue) - { - - } - - TDuration GetWaitDuration() const { - if (WaitDurationValue >= 0) { - return TDuration::FromValue(WaitDurationValue); - } else { - return TDuration::Zero(); - } - } - - TDuration GetLateness() const { - if (WaitDurationValue >= 0) { - return TDuration::Zero(); - } else { - return TDuration::FromValue(-WaitDurationValue); - } - } - }; - - class TSchemeInfo { - private: - TRWAddress Address; - YDB_ACCESSOR_DEF(std::shared_ptr<ISnapshotSchema>, TargetScheme); - public: - TString DebugString() const { - return TStringBuilder() << "{address=" << Address.DebugString() << ";target_scheme=" << TargetScheme->DebugString() << "}"; - } - - const TRWAddress& GetAddress() const { - return Address; - } - - TSchemeInfo(TRWAddress&& address, const std::shared_ptr<ISnapshotSchema>& targetScheme) - : Address(std::move(address)) - , TargetScheme(targetScheme) { - - } - }; - private: - YDB_READONLY_DEF(std::optional<TEvictionInfo>, Eviction); - YDB_READONLY_DEF(std::optional<TSchemeInfo>, Scheme); - public: - TActualizationInfo() = default; - - TString DebugString() const { - TStringBuilder sb; - if (Eviction) { - sb << "EVICTION=" << Eviction->DebugString() << ";"; - } - if (Scheme) { - sb << "SCHEME=" << Scheme->DebugString() << ";"; - } - return sb; - } - - void SetEviction(TRWAddress&& address, const TString& targetTierName, const i64 waitDurationValue) { - AFL_VERIFY(!Eviction); - Eviction = TEvictionInfo(std::move(address), targetTierName, waitDurationValue); - } - - void SetScheme(TRWAddress&& address, const std::shared_ptr<ISnapshotSchema>& targetScheme) { - AFL_VERIFY(!Scheme); - Scheme = TSchemeInfo(std::move(address), targetScheme); - } - }; - + void Start(); TGranuleActualizationIndex(const ui64 pathId, const TVersionedIndex& versionedIndex); - void BuildActualizationTasks(TTieringProcessContext& context, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portionInfos) const; - - [[nodiscard]] bool RefreshTiering(const std::optional<TTiering>& info); - [[nodiscard]] bool RefreshScheme(); - - void Rebuild(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions); - - TActualizationInfo BuildActualizationInfo(const std::shared_ptr<TPortionInfo>& portion, const TInstant now) const; + void BuildActualizationTasks(TTieringProcessContext& tasksContext, const NActualizer::TExternalTasksContext& externalContext) const; - void AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TInstant now); + void RefreshTiering(const std::optional<TTiering>& info, const TAddExternalContext& context); + void RefreshScheme(const TAddExternalContext& context); + void AddPortion(const std::shared_ptr<TPortionInfo>& portion, const TAddExternalContext& context); void RemovePortion(const std::shared_ptr<TPortionInfo>& portion); }; diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make index ff8f03727d0..8308be890dc 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/index/ya.make @@ -2,7 +2,6 @@ LIBRARY() SRCS( index.cpp - counters.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp new file mode 100644 index 00000000000..71a6aa6d49e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.cpp @@ -0,0 +1,81 @@ +#include "scheme.h" +#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h> +#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> +#include <ydb/core/tx/columnshard/data_locks/manager/manager.h> + +namespace NKikimr::NOlap::NActualizer { + +std::optional<NKikimr::NOlap::NActualizer::TSchemeActualizer::TFullActualizationInfo> TSchemeActualizer::BuildActualizationInfo(const TPortionInfo& portion) const { + AFL_VERIFY(TargetSchema); + const TString& currentTierName = portion.GetTierNameDef(IStoragesManager::DefaultStorageId); + auto portionSchema = VersionedIndex.GetSchema(portion.GetMinSnapshot()); + if (portionSchema->GetVersion() < TargetSchema->GetVersion()) { + auto storagesWrite = TargetSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); + auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); + TRWAddress address(std::move(storagesRead), std::move(storagesWrite)); + return TFullActualizationInfo(std::move(address), TargetSchema); + } + return {}; +} + +void TSchemeActualizer::DoAddPortion(const TPortionInfo& info, const TAddExternalContext& /*context*/) { + if (!TargetSchema) { + return; + } + auto actualizationInfo = BuildActualizationInfo(info); + if (!actualizationInfo) { + return; + } + PortionsToActualizeScheme[actualizationInfo->GetAddress()].emplace(info.GetPortionId()); + PortionsInfo.emplace(info.GetPortionId(), actualizationInfo->ExtractFindId()); +} + +void TSchemeActualizer::DoRemovePortion(const TPortionInfo& info) { + auto it = PortionsInfo.find(info.GetPortionId()); + if (it == PortionsInfo.end()) { + return; + } + + auto itAddress = PortionsToActualizeScheme.find(it->second.GetRWAddress()); + AFL_VERIFY(itAddress != PortionsToActualizeScheme.end()); + AFL_VERIFY(itAddress->second.erase(info.GetPortionId())); + if (itAddress->second.empty()) { + PortionsToActualizeScheme.erase(itAddress); + } +} + +void TSchemeActualizer::DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) const { + for (auto&& [address, portions] : PortionsToActualizeScheme) { + if (!tasksContext.IsRWAddressAvailable(address)) { + break; + } + for (auto&& portionId : portions) { + auto portion = externalContext.GetPortionVerified(portionId); + auto info = BuildActualizationInfo(*portion); + AFL_VERIFY(info); + auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot()); + TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); + + if (!tasksContext.AddPortion(*portion, std::move(features), {})) { + break; + } + } + } +} + +void TSchemeActualizer::Refresh(const TAddExternalContext& externalContext) { + TargetSchema = VersionedIndex.GetLastCriticalSchema(); + if (!TargetSchema) { + AFL_VERIFY(PortionsInfo.empty()); + } else { + PortionsInfo.clear(); + PortionsToActualizeScheme.clear(); + for (auto&& i : externalContext.GetPortions()) { + AddPortion(i.second, externalContext); + } + } +} + +} diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h new file mode 100644 index 00000000000..55e96745393 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/scheme.h @@ -0,0 +1,71 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h> +#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h> + +namespace NKikimr::NOlap::NActualizer { + +class TSchemeActualizer: public IActualizer { +private: + THashMap<TRWAddress, THashSet<ui64>> PortionsToActualizeScheme; + std::shared_ptr<ISnapshotSchema> TargetSchema; + const ui64 PathId; + const TVersionedIndex& VersionedIndex; + + class TFindActualizationInfo { + private: + TRWAddress RWAddress; + public: + const TRWAddress& GetRWAddress() const { + return RWAddress; + } + + TFindActualizationInfo(TRWAddress&& rwAddress) + : RWAddress(std::move(rwAddress)) { + + } + }; + + THashMap<ui64, TFindActualizationInfo> PortionsInfo; + + class TFullActualizationInfo { + private: + TRWAddress Address; + YDB_ACCESSOR_DEF(std::shared_ptr<ISnapshotSchema>, TargetScheme); + public: + TFindActualizationInfo ExtractFindId() { + return TFindActualizationInfo(std::move(Address)); + } + + TString DebugString() const { + return TStringBuilder() << "{address=" << Address.DebugString() << ";target_scheme=" << TargetScheme->DebugString() << "}"; + } + + const TRWAddress& GetAddress() const { + return Address; + } + + TFullActualizationInfo(TRWAddress&& address, const std::shared_ptr<ISnapshotSchema>& targetScheme) + : Address(std::move(address)) + , TargetScheme(targetScheme) { + + } + }; + + std::optional<TFullActualizationInfo> BuildActualizationInfo(const TPortionInfo& portion) const; + +protected: + virtual void DoAddPortion(const TPortionInfo& info, const TAddExternalContext& context) override; + virtual void DoRemovePortion(const TPortionInfo& info) override; + virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const override; +public: + void Refresh(const TAddExternalContext& externalContext); + + TSchemeActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex) + : PathId(pathId) + , VersionedIndex(versionedIndex) { + Y_UNUSED(PathId); + } +}; + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make new file mode 100644 index 00000000000..f987d972103 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/scheme/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + scheme.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme/versions +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp new file mode 100644 index 00000000000..4db06e8d421 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.cpp @@ -0,0 +1,159 @@ +#include "tiering.h" +#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/changes/actualization/construction/context.h> +#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> +#include <ydb/core/tx/columnshard/data_locks/manager/manager.h> +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> + +namespace NKikimr::NOlap::NActualizer { + +std::shared_ptr<NKikimr::NOlap::ISnapshotSchema> TTieringActualizer::GetTargetSchema(const std::shared_ptr<ISnapshotSchema>& portionSchema) const { + if (!TargetCriticalSchema) { + return portionSchema; + } + if (portionSchema->GetVersion() < TargetCriticalSchema->GetVersion()) { + return TargetCriticalSchema; + } + return portionSchema; +} + +std::optional<TTieringActualizer::TFullActualizationInfo> TTieringActualizer::BuildActualizationInfo(const TPortionInfo& portion, const TInstant now) const { + std::shared_ptr<ISnapshotSchema> portionSchema = VersionedIndex.GetSchema(portion.GetMinSnapshot()); + std::shared_ptr<ISnapshotSchema> targetSchema = GetTargetSchema(portionSchema); + const TString& currentTierName = portion.GetTierNameDef(IStoragesManager::DefaultStorageId); + + if (Tiering) { + AFL_VERIFY(TieringColumnId); + auto statOperator = portionSchema->GetIndexInfo().GetStatistics(NStatistics::TIdentifier(NStatistics::EType::Max, {*TieringColumnId})); + std::shared_ptr<arrow::Scalar> max; + if (!statOperator) { + max = portion.MaxValue(*TieringColumnId); + if (!max) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max"); + return {}; + } + } else { + NYDBTest::TControllers::GetColumnShardController()->OnStatisticsUsage(statOperator); + max = statOperator.GetScalarVerified(portion.GetMeta().GetStatisticsStorage()); + } + auto tieringInfo = Tiering->GetTierToMove(max, now); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString()); + std::optional<i64> d; + std::set<TString> storagesWrite; + TString targetTierName; + if (portion.GetTierNameDef(IStoragesManager::DefaultStorageId) != tieringInfo.GetCurrentTierName()) { + d = -1 * tieringInfo.GetCurrentTierLag().GetValue(); + targetTierName = tieringInfo.GetCurrentTierName(); + } else if (tieringInfo.GetNextTierName()) { + d = tieringInfo.GetNextTierWaitingVerified().GetValue(); + targetTierName = tieringInfo.GetNextTierNameVerified(); + } + if (d) { + // if (currentTierName == "deploy_logs_s3" && targetTierName == IStoragesManager::DefaultStorageId) { + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId); + // AFL_VERIFY(false)("tiering_info", tieringInfo.DebugString())("max", max->ToString())("now", now.ToString())("d", *d)("tiering", Tiering->GetDebugString())("pathId", PathId); + // } + auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(targetTierName); + auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); + return TFullActualizationInfo(TRWAddress(std::move(storagesRead), std::move(storagesWrite)), targetTierName, *d, targetSchema); + } + } else if (currentTierName != IStoragesManager::DefaultStorageId) { + // if (currentTierName == "deploy_logs_s3") { + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("pathId", PathId); + // AFL_VERIFY(false)("pathId", PathId); + // } + auto storagesWrite = targetSchema->GetIndexInfo().GetUsedStorageIds(IStoragesManager::DefaultStorageId); + auto storagesRead = portionSchema->GetIndexInfo().GetUsedStorageIds(currentTierName); + TRWAddress address(std::move(storagesRead), std::move(storagesWrite)); + return TFullActualizationInfo(std::move(address), IStoragesManager::DefaultStorageId, 0, targetSchema); + } + return {}; +} + +void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) { + auto info = BuildActualizationInfo(portion, addContext.GetNow()); + if (!info) { + return; + } + PortionIdByWaitDuration[info->GetAddress()][info->GetWaitDuration() + (addContext.GetNow() - StartInstant)].emplace(portion.GetPortionId()); + auto address = info->GetAddress(); + TFindActualizationInfo findId(std::move(address), info->GetWaitDuration() + (addContext.GetNow() - StartInstant)); + PortionsInfo.emplace(portion.GetPortionId(), std::move(findId)); +} + +void TTieringActualizer::DoRemovePortion(const TPortionInfo& info) { + auto it = PortionsInfo.find(info.GetPortionId()); + if (it == PortionsInfo.end()) { + return; + } + auto itAddress = PortionIdByWaitDuration.find(it->second.GetRWAddress()); + AFL_VERIFY(itAddress != PortionIdByWaitDuration.end()); + auto itDuration = itAddress->second.find(it->second.GetWaitDuration()); + AFL_VERIFY(itDuration != itAddress->second.end()); + AFL_VERIFY(itDuration->second.erase(info.GetPortionId())); + if (itDuration->second.empty()) { + itAddress->second.erase(itDuration); + } + if (itAddress->second.empty()) { + PortionIdByWaitDuration.erase(itAddress); + } +} + +void TTieringActualizer::DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& /*internalContext*/) const { + for (auto&& [address, addressPortions] : PortionIdByWaitDuration) { + if (!tasksContext.IsRWAddressAvailable(address)) { + break; + } + for (auto&& [duration, portions] : addressPortions) { + if (duration - (tasksContext.Now - StartInstant) > TDuration::Zero()) { + break; + } + bool limitEnriched = false; + for (auto&& p : portions) { + auto portion = externalContext.GetPortionVerified(p); + auto info = BuildActualizationInfo(*portion, tasksContext.Now); + AFL_VERIFY(info); + auto portionScheme = VersionedIndex.GetSchema(portion->GetMinSnapshot()); + TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId)); + features.SetTargetTierName(info->GetTargetTierName()); + + const TInstant maxChangePortionInstant = portion->RecordSnapshotMax().GetPlanInstant(); + if (info->GetTargetTierName() != IStoragesManager::DefaultStorageId && portion->GetTierNameDef(IStoragesManager::DefaultStorageId) == IStoragesManager::DefaultStorageId) { + if (tasksContext.Now - maxChangePortionInstant < NYDBTest::TControllers::GetColumnShardController()->GetLagForCompactionBeforeTierings(TDuration::Minutes(60))) { + tasksContext.GetCounters().OnActualizationSkipTooFreshPortion(tasksContext.Now - maxChangePortionInstant); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", tasksContext.Now - maxChangePortionInstant); + continue; + } + } + + if (!tasksContext.AddPortion(*portion, std::move(features), info->GetLateness())) { + limitEnriched = true; + break; + } + } + if (limitEnriched) { + break; + } + } + } +} + +void TTieringActualizer::Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext) { + StartInstant = externalContext.GetNow(); + Tiering = info; + if (Tiering) { + TieringColumnId = VersionedIndex.GetLastSchema()->GetColumnId(Tiering->GetTtlColumn()); + } else { + TieringColumnId = {}; + } + TargetCriticalSchema = VersionedIndex.GetLastCriticalSchema(); + PortionsInfo.clear(); + PortionIdByWaitDuration.clear(); + + for (auto&& i : externalContext.GetPortions()) { + AddPortion(i.second, externalContext); + } +} + +} diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h new file mode 100644 index 00000000000..26f521ce05b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/tiering.h @@ -0,0 +1,102 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/storage/actualizer/abstract/abstract.h> +#include <ydb/core/tx/columnshard/engines/storage/actualizer/common/address.h> +#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h> +#include <ydb/core/tx/columnshard/engines/scheme/tiering/tier_info.h> + +namespace NKikimr::NOlap { +class TTiering; +} + +namespace NKikimr::NOlap::NActualizer { + +class TTieringActualizer: public IActualizer { +private: + class TFullActualizationInfo { + private: + TRWAddress Address; + YDB_ACCESSOR_DEF(TString, TargetTierName); + YDB_ACCESSOR_DEF(ISnapshotSchema::TPtr, TargetScheme); + i64 WaitDurationValue; + public: + TString DebugString() const { + return TStringBuilder() << "{address=" << Address.DebugString() << ";target_tier=" << TargetTierName << ";wait_duration=" << TDuration::FromValue(WaitDurationValue) << "}"; + } + + const TRWAddress& GetAddress() const { + return Address; + } + + TFullActualizationInfo(TRWAddress&& address, const TString& targetTierName, const i64 waitDurationValue, const ISnapshotSchema::TPtr& targetScheme) + : Address(std::move(address)) + , TargetTierName(targetTierName) + , TargetScheme(targetScheme) + , WaitDurationValue(waitDurationValue) + { + + } + + TDuration GetWaitDuration() const { + if (WaitDurationValue >= 0) { + return TDuration::FromValue(WaitDurationValue); + } else { + return TDuration::Zero(); + } + } + + TDuration GetLateness() const { + if (WaitDurationValue >= 0) { + return TDuration::Zero(); + } else { + return TDuration::FromValue(-WaitDurationValue); + } + } + }; + + class TFindActualizationInfo { + private: + TRWAddress RWAddress; + YDB_READONLY_DEF(TDuration, WaitDuration); + public: + const TRWAddress& GetRWAddress() const { + return RWAddress; + } + + TFindActualizationInfo(TRWAddress&& rwAddress, const TDuration waitDuration) + : RWAddress(std::move(rwAddress)) + , WaitDuration(waitDuration) { + + } + }; + + std::optional<TTiering> Tiering; + std::optional<ui32> TieringColumnId; + + std::shared_ptr<ISnapshotSchema> TargetCriticalSchema; + const ui64 PathId; + const TVersionedIndex& VersionedIndex; + + TInstant StartInstant = TInstant::Zero(); + THashMap<TRWAddress, std::map<TDuration, THashSet<ui64>>> PortionIdByWaitDuration; + THashMap<ui64, TFindActualizationInfo> PortionsInfo; + + std::shared_ptr<ISnapshotSchema> GetTargetSchema(const std::shared_ptr<ISnapshotSchema>& portionSchema) const; + + std::optional<TFullActualizationInfo> BuildActualizationInfo(const TPortionInfo& portion, const TInstant now) const; + + virtual void DoAddPortion(const TPortionInfo& portion, const TAddExternalContext& addContext) override; + virtual void DoRemovePortion(const TPortionInfo& info) override; + virtual void DoBuildTasks(TTieringProcessContext& tasksContext, const TExternalTasksContext& externalContext, TInternalTasksContext& internalContext) const override; + +public: + void Refresh(const std::optional<TTiering>& info, const TAddExternalContext& externalContext); + + TTieringActualizer(const ui64 pathId, const TVersionedIndex& versionedIndex) + : PathId(pathId) + , VersionedIndex(versionedIndex) + { + Y_UNUSED(PathId); + } +}; + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make new file mode 100644 index 00000000000..7ce5affd1ae --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/tiering/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + tiering.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme/versions +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make b/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make index 7b46cab8792..0689e11c04e 100644 --- a/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/actualizer/ya.make @@ -3,6 +3,10 @@ LIBRARY() PEERDIR( ydb/core/tx/columnshard/engines/storage/actualizer/index ydb/core/tx/columnshard/engines/storage/actualizer/common + ydb/core/tx/columnshard/engines/storage/actualizer/abstract + ydb/core/tx/columnshard/engines/storage/actualizer/scheme + ydb/core/tx/columnshard/engines/storage/actualizer/tiering + ydb/core/tx/columnshard/engines/storage/actualizer/counters ) END() diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 706f5d1abca..e8ee5ee4b24 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -70,7 +70,8 @@ void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> port } else { OptimizerPlanner->StartModificationGuard().AddPortion(portionAfter); } - ActualizationIndex->AddPortion(portionAfter, HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now()); + NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions); + ActualizationIndex->AddPortion(portionAfter, context); } } if (!!AdditiveSummaryCache) { diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 6b353ed0847..838dd7a6ad3 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -182,9 +182,8 @@ private: YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero()); public: void RefreshTiering(const std::optional<TTiering>& tiering) { - if (ActualizationIndex->RefreshTiering(tiering)) { - ActualizationIndex->Rebuild(Portions); - } + NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions); + ActualizationIndex->RefreshTiering(tiering, context); } void StartActualizationIndex() { @@ -204,7 +203,8 @@ public: } void BuildActualizationTasks(NActualizer::TTieringProcessContext& context) const { - ActualizationIndex->BuildActualizationTasks(context, Portions); + NActualizer::TExternalTasksContext extTasks(Portions); + ActualizationIndex->BuildActualizationTasks(context, extTasks); } std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> self, const std::shared_ptr<NDataLocks::TManager>& locksManager) const { diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index 838b8120c7b..9ccaf1633b4 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -7,7 +7,7 @@ #include <ydb/core/tx/columnshard/counters/indexation.h> #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> -#include <ydb/core/tx/columnshard/engines/storage/granule.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h index ec911218475..4006c9c1fc1 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.h +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h @@ -4,7 +4,6 @@ #include <ydb/core/tx/columnshard/counters/indexation.h> #include <ydb/core/tx/columnshard/engines/scheme/column_features.h> #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> -#include <ydb/core/tx/columnshard/engines/storage/granule.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make index dd076933faa..89f675d5f3f 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ya.make +++ b/ydb/core/tx/columnshard/splitter/ut/ya.make @@ -9,6 +9,7 @@ PEERDIR( ydb/core/tx/columnshard/counters ydb/core/tx/columnshard/engines/portions ydb/core/tx/columnshard/common + ydb/core/tx/columnshard/blobs_action ydb/core/tx/columnshard/data_sharing ydb/core/kqp/common ydb/library/yql/parser/pg_wrapper diff --git a/ydb/core/tx/columnshard/splitter/ya.make b/ydb/core/tx/columnshard/splitter/ya.make index 22582303a82..7f86cada366 100644 --- a/ydb/core/tx/columnshard/splitter/ya.make +++ b/ydb/core/tx/columnshard/splitter/ya.make @@ -15,8 +15,8 @@ SRCS( PEERDIR( contrib/libs/apache/arrow - ydb/core/tx/columnshard/engines/storage ydb/core/tx/columnshard/splitter/abstract + ydb/core/tx/columnshard/engines/scheme ) END() |