diff options
| author | ivanmorozov333 <[email protected]> | 2024-12-04 17:17:54 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-12-04 17:17:54 +0300 |
| commit | 4f34c1a2f1ac7840c21fc84d5d6c35f7972f7a9b (patch) | |
| tree | 3e16cb754f7153ae57e823579f497c0a6b97c0a3 | |
| parent | ff307c7192bfd67f89b728fcec70c7ffb0f22b17 (diff) | |
compaction. lc levels configuration (#12273)
11 files changed, 227 insertions, 17 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 93e1d345b92..9ee043258b8 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -2699,6 +2699,18 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } { + auto alterQuery = + TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=` + {"levels" : [{"class_name" : "Zero", "portions_live_duration" : "180s", "expected_blobs_size" : 2048000}, + {"class_name" : "Zero", "expected_blobs_size" : 2048000}, {"class_name" : "Zero"}]}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { auto it = tableClient.StreamExecuteScanQuery(R"( --!syntax_v1 diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 50505ae2f27..1ebb1a67bab 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -460,6 +460,20 @@ message TStorageTierConfig { optional TCompressionOptions Compression = 3; } +message TCompactionLevelConstructorContainer { + optional string ClassName = 1; + + message TZeroLevel { + optional uint32 PortionsLiveDurationSeconds = 1; + optional uint64 ExpectedBlobsSize = 2; + } + + oneof Implementation { + TZeroLevel ZeroLevel = 10; + } + +} + message TCompactionPlannerConstructorContainer { optional string ClassName = 1; @@ -473,7 +487,7 @@ message TCompactionPlannerConstructorContainer { } message TLCOptimizer { - + repeated TCompactionLevelConstructorContainer Levels = 1; } oneof Implementation { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp index ed41f5de42f..bf813d2cc68 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { NKikimr::TConclusion<std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::IOptimizerPlanner>> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const { - return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema()); + return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels); } bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& current) const { @@ -23,6 +23,9 @@ bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructo void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const { *proto.MutableLCBuckets() = NKikimrSchemeOp::TCompactionPlannerConstructorContainer::TLCOptimizer(); + for (auto&& i : Levels) { + *proto.MutableLCBuckets()->AddLevels() = i.SerializeToProto(); + } } bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) { @@ -30,7 +33,40 @@ bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) { AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-buckets optimizer from proto")("proto", proto.DebugString()); return false; } + for (auto&& i : proto.GetLCBuckets().GetLevels()) { + TLevelConstructorContainer lContainer; + if (!lContainer.DeserializeFromProto(i)) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-bucket level")("proto", i.DebugString()); + return false; + } + Levels.emplace_back(std::move(lContainer)); + } return true; } +NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo.Has("levels")) { + return TConclusionStatus::Fail("no levels description"); + } + if (!jsonInfo["levels"].IsArray()) { + return TConclusionStatus::Fail("levels have to been array in json description"); + } + auto& arr = jsonInfo["levels"].GetArray(); + if (!arr.size()) { + return TConclusionStatus::Fail("no objects in json array 'levels'"); + } + for (auto&& i : arr) { + const auto className = i["class_name"].GetStringRobust(); + auto level = ILevelConstructor::TFactory::MakeHolder(className); + if (!level) { + return TConclusionStatus::Fail("incorrect level class_name: " + className); + } + if (!level->DeserializeFromJson(i["description"])) { + return TConclusionStatus::Fail("cannot parse level: " + className + ": " + i["description"].GetStringRobust()); + } + Levels.emplace_back(TLevelConstructorContainer(std::shared_ptr<ILevelConstructor>(level.Release()))); + } + return TConclusionStatus::Success(); +} + } // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h index f1d47481c17..f85249435ea 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h @@ -1,31 +1,75 @@ #pragma once #include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { +class ILevelConstructor { +private: + virtual std::shared_ptr<IPortionsLevel> DoBuildLevel( + const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const = 0; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0; + +public: + using TFactory = NObjectFactory::TObjectFactory<ILevelConstructor, TString>; + using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer; + + virtual ~ILevelConstructor() = default; + + std::shared_ptr<IPortionsLevel> BuildLevel( + const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const { + return DoBuildLevel(nextLevel, indexLevel, counters); + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) { + return DoDeserializeFromJson(json); + } + + bool DeserializeFromProto(const TProto& proto) { + return DoDeserializeFromProto(proto); + } + void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { + return DoSerializeToProto(proto); + } + virtual TString GetClassName() const = 0; +}; + +class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor> { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor>; + +public: + using TBase::TBase; +}; + class TOptimizerPlannerConstructor: public IOptimizerPlannerConstructor { public: static TString GetClassNameStatic() { return "lc-buckets"; } + private: - static inline const TFactory::TRegistrator<TOptimizerPlannerConstructor> Registrator = TFactory::TRegistrator<TOptimizerPlannerConstructor>(GetClassNameStatic()); + std::vector<TLevelConstructorContainer> Levels; + + static inline const TFactory::TRegistrator<TOptimizerPlannerConstructor> Registrator = + TFactory::TRegistrator<TOptimizerPlannerConstructor>(GetClassNameStatic()); virtual void DoSerializeToProto(TProto& proto) const override; virtual bool DoDeserializeFromProto(const TProto& proto) override; - virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& /*jsonInfo*/) override { - return TConclusionStatus::Success(); - } + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override; virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const override; virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override; + public: virtual TString GetClassName() const override { return GetClassNameStatic(); } - }; -} // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make index f95d3abf746..86918b52199 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( GLOBAL constructor.cpp + GLOBAL zero_level.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp new file mode 100644 index 00000000000..6a02746bc44 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp @@ -0,0 +1,57 @@ +#include "zero_level.h" + +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +TConclusionStatus TZeroLevelConstructor::DoDeserializeFromJson(const NJson::TJsonValue& json) { + if (json.Has("portions_live_duration")) { + const auto& jsonValue = json["portions_live_duration"]; + if (!jsonValue.IsString()) { + return TConclusionStatus::Fail("incorrect portions_live_duration value (have to be similar as 10s, 20m, 30d, etc)"); + } + TDuration d; + if (!TDuration::TryParse(jsonValue.GetString(), d)) { + return TConclusionStatus::Fail("cannot parse portions_live_duration value " + jsonValue.GetString()); + } + PortionsLiveDuration = d; + } + if (json.Has("expected_blobs_size")) { + const auto& jsonValue = json["expected_blobs_size"]; + if (!jsonValue.IsUInteger()) { + return TConclusionStatus::Fail("incorrect expected_blobs_size value (have to be unsigned int)"); + } + ExpectedBlobsSize = jsonValue.GetUInteger(); + } + return TConclusionStatus::Success(); +} + +bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) { + if (!proto.HasZeroLevel()) { + return true; + } + if (proto.GetZeroLevel().HasPortionsLiveDurationSeconds()) { + PortionsLiveDuration = TDuration::Seconds(proto.GetZeroLevel().GetPortionsLiveDurationSeconds()); + } + if (proto.GetZeroLevel().HasExpectedBlobsSize()) { + ExpectedBlobsSize = proto.GetZeroLevel().GetExpectedBlobsSize(); + } + return true; +} + +void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { + if (PortionsLiveDuration) { + proto.MutableZeroLevel()->SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds()); + } + if (ExpectedBlobsSize) { + proto.MutableZeroLevel()->SetExpectedBlobsSize(*ExpectedBlobsSize); + } +} + +std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> TZeroLevelConstructor::DoBuildLevel( + const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const { + return std::make_shared<TZeroLevelPortions>( + indexLevel, nextLevel, counters, PortionsLiveDuration.value_or(TDuration::Max()), ExpectedBlobsSize.value_or((ui64)1 << 20)); +} + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h new file mode 100644 index 00000000000..531c60f3690 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h @@ -0,0 +1,30 @@ +#pragma once +#include "constructor.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TZeroLevelConstructor: public ILevelConstructor { +public: + static TString GetClassNameStatic() { + return "Zero"; + } + +private: + std::optional<TDuration> PortionsLiveDuration; + std::optional<ui64> ExpectedBlobsSize; + + virtual std::shared_ptr<IPortionsLevel> DoBuildLevel( + const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const TLevelCounters& counters) const override; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override; + + static const inline TFactory::TRegistrator<TZeroLevelConstructor> Registrator = TFactory::TRegistrator<TZeroLevelConstructor>(GetClassNameStatic()); + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp index 361a88c457a..b26a2b90d04 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp @@ -3,12 +3,14 @@ #include "optimizer.h" #include "zero_level.h" +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h> + #include <util/string/join.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { -TOptimizerPlanner::TOptimizerPlanner( - const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema) +TOptimizerPlanner::TOptimizerPlanner(const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager, + const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors) : TBase(pathId) , Counters(std::make_shared<TCounters>()) , StoragesManager(storagesManager) @@ -19,9 +21,19 @@ TOptimizerPlanner::TOptimizerPlanner( Levels.emplace_back( std::make_shared<TLevelPortions>(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2))); */ - Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max())); - Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max())); - Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180))); + if (levelConstructors.size()) { + std::shared_ptr<IPortionsLevel> nextLevel; + ui32 idx = levelConstructors.size(); + for (auto it = levelConstructors.rbegin(); it != levelConstructors.rend(); ++it) { + --idx; + Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, Counters->GetLevelCounters(idx))); + } + } else { + Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max(), 1 << 20)); + Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max(), 1 << 20)); + Levels.emplace_back( + std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180), 1 << 20)); + } std::reverse(Levels.begin(), Levels.end()); RefreshWeights(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h index 0f48b468069..0f576672243 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h @@ -4,6 +4,8 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { +class TLevelConstructorContainer; + class TOptimizerPlanner: public IOptimizerPlanner { private: using TBase = IOptimizerPlanner; @@ -144,8 +146,8 @@ public: return result; } - TOptimizerPlanner( - const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager, const std::shared_ptr<arrow::Schema>& primaryKeysSchema); + TOptimizerPlanner(const ui64 pathId, const std::shared_ptr<IStoragesManager>& storagesManager, + const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors); }; } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp index e8854f74261..18d09fe007a 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp @@ -26,7 +26,7 @@ ui64 TZeroLevelPortions::DoGetWeight() const { return 0; } if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) { - if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) { + if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) { return 0; } } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h index 7cdbf863b27..cd7385501e3 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h @@ -9,6 +9,7 @@ private: using TBase = IPortionsLevel; const TLevelCounters LevelCounters; const TDuration DurationToDrop; + const ui64 ExpectedBlobsSize; class TOrderedPortion { private: YDB_READONLY_DEF(TPortionInfo::TConstPtr, Portion); @@ -91,10 +92,11 @@ private: virtual TCompactionTaskData DoGetOptimizationTask() const override; public: - TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop) + TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop, const ui64 expectedBlobsSize) : TBase(levelIdx, nextLevel) , LevelCounters(levelCounters) , DurationToDrop(durationToDrop) + , ExpectedBlobsSize(expectedBlobsSize) { } }; |
