diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-23 16:46:11 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-23 17:38:50 +0300 |
commit | cb1f1ff293a64febd278ea89e86ccb57c9adfdb3 (patch) | |
tree | 0f8dba2a901d1493a295aa0a5cab0a9371c6fff4 | |
parent | 08405a23b9b6caeabe8d8e19f9d850b0fe4fe52e (diff) | |
download | ydb-cb1f1ff293a64febd278ea89e86ccb57c9adfdb3.tar.gz |
KIKIMR-19211: use buckets planner
11 files changed, 73 insertions, 25 deletions
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 2126a0042fd..2657daf200b 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -1,8 +1,6 @@ #include "granule.h" #include "storage.h" #include <library/cpp/actors/core/log.h> -#include "optimizer/intervals/optimizer.h" -#include "optimizer/levels/optimizer.h" #include "optimizer/lbuckets/optimizer.h" namespace NKikimr::NOlap { @@ -167,7 +165,7 @@ TGranuleMeta::TGranuleMeta(const ui64 pathId, std::shared_ptr<TGranulesStorage> , PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard()) { Y_ABORT_UNLESS(Owner); - OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(PathId, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey()); + OptimizerPlanner = std::make_shared<NStorageOptimizer::NBuckets::TOptimizerPlanner>(PathId, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey()); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index 330a426a53b..7f8585834a4 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -198,7 +198,7 @@ private: return true; } public: - void Validate(const std::shared_ptr<TPortionInfo>& portion) const { + bool Validate(const std::shared_ptr<TPortionInfo>& portion) const { if (portion) { AFL_VERIFY(!PreActuals.contains(portion->GetPortionId())); AFL_VERIFY(!Actuals.contains(portion->GetPortionId())); @@ -235,6 +235,7 @@ public: AFL_VERIFY(!f.second.contains(i.first)); } } + return true; } bool IsEmpty() const { @@ -519,6 +520,14 @@ public: const ui64 count = BucketInfo.GetCount() + ((mainPortion && !isFinal) ? 1 : 0); const ui64 recordsCount = BucketInfo.GetRecordsCount() + ((mainPortion && !isFinal) ? mainPortion->GetRecordsCount() : 0); const ui64 sumBytes = BucketInfo.GetBytes() + ((mainPortion && !isFinal) ? mainPortion->GetBlobBytes() : 0); + if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Disable) { + return 0; + } + const ui64 weight = (10000000000.0 * count - sumBytes) * (isFinal ? 1 : 10); + if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Force) { + return (count > 1) ? weight : 0; + } + if (count > 1 && (sumBytes > 32 * 1024 * 1024 || !isFinal || count > 100 || recordsCount > 100000)) { return (10000000000.0 * count - sumBytes) * (isFinal ? 1 : 10); } else { @@ -562,8 +571,8 @@ private: } } - void Validate() const { - Others.Validate(MainPortion); + bool Validate() const { + return Others.Validate(MainPortion); } public: class TModificationGuard: TNonCopyable { @@ -577,11 +586,11 @@ public: , IsEmptyOthers(Owner.Others.ActualsEmpty()) , HasNextBorder(Owner.NextBorder) { -// Owner.Validate(); + AFL_VERIFY_DEBUG(Owner.Validate()); } ~TModificationGuard() { -// Owner.Validate(); + AFL_VERIFY_DEBUG(Owner.Validate()); if (!Owner.MainPortion) { return; } @@ -957,7 +966,7 @@ public: } else { if (itFrom == Buckets.end()) { const TDuration freshness = now - TInstant::MilliSeconds(portion->RecordSnapshotMax().GetPlanStep()); - if (freshness < GetCommonFreshnessCheckDuration()) { + if (freshness < GetCommonFreshnessCheckDuration() || portion->GetMeta().GetProduced() == NPortion::EProduced::INSERTED) { AddOther(portion, now); return; } @@ -1017,7 +1026,11 @@ protected: Buckets.Actualize(currentInstant); } virtual TOptimizationPriority DoGetUsefulMetric() const override { - return TOptimizationPriority::Critical(Buckets.GetWeight()); + if (Buckets.GetWeight()) { + return TOptimizationPriority::Critical(Buckets.GetWeight()); + } else { + return TOptimizationPriority::Zero(); + } } virtual TString DoDebugString() const override { return Buckets.DebugString(); diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt index 5810287df79..b2ebd6d0052 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt @@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC yql-sql-pg_dummy core-arrow_kernels-request core-testlib-default + columnshard-hooks-abstract + columnshard-hooks-testing json2_udf ) target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt index 3637b62e837..c60d79794a3 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt @@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC yql-sql-pg_dummy core-arrow_kernels-request core-testlib-default + columnshard-hooks-abstract + columnshard-hooks-testing json2_udf ) target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt index 62b2cbc7422..9748cc589af 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt @@ -29,6 +29,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC yql-sql-pg_dummy core-arrow_kernels-request core-testlib-default + columnshard-hooks-abstract + columnshard-hooks-testing json2_udf ) target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt index fc4d07a486b..687550a8606 100644 --- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt @@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC yql-sql-pg_dummy core-arrow_kernels-request core-testlib-default + columnshard-hooks-abstract + columnshard-hooks-testing json2_udf ) target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make index 04f31654b96..13fc351c5a5 100644 --- a/ydb/core/tx/columnshard/engines/ut/ya.make +++ b/ydb/core/tx/columnshard/engines/ut/ya.make @@ -23,6 +23,8 @@ PEERDIR( ydb/library/yql/sql/pg_dummy ydb/library/yql/core/arrow_kernels/request ydb/core/testlib/default + ydb/core/tx/columnshard/hooks/abstract + ydb/core/tx/columnshard/hooks/testing ydb/library/yql/udfs/common/json2 ) diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 96e0e925133..1e3d7f712ff 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -6,6 +6,7 @@ #include <ydb/core/tx/columnshard/columnshard_ut_common.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> #include <ydb/core/tx/columnshard/blobs_action/bs/storage.h> +#include <ydb/core/tx/columnshard/hooks/testing/controller.h> namespace NKikimr { @@ -14,6 +15,7 @@ using namespace NOlap; namespace NTypeIds = NScheme::NTypeIds; using TTypeId = NScheme::TTypeId; using TTypeInfo = NScheme::TTypeInfo; +using TDefaultTestsController = NKikimr::NYDBTest::NColumnShard::TController; namespace { @@ -572,6 +574,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { Y_UNIT_TEST(IndexWriteOverload) { TTestDbWrapper db; + auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>(); TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);; ui64 pathId = 1; diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 093ccd3031e..4339a6d1a61 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -25,6 +25,12 @@ class RecordBatch; namespace NKikimr::NYDBTest { +enum class EOptimizerCompactionWeightControl { + Disable, + Default, + Force +}; + class ICSController { private: YDB_READONLY(TAtomicCounter, OnSortingPolicyCounter, 0); @@ -64,6 +70,9 @@ public: bool OnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) { return DoOnStartCompaction(changes); } + virtual EOptimizerCompactionWeightControl GetCompactionControl() const { + return EOptimizerCompactionWeightControl::Force; + } virtual TDuration GetTTLDefaultWaitingDuration(const TDuration defaultValue) const { return defaultValue; } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 41585f2bb66..60160940fe4 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -10,6 +10,7 @@ private: YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero()); YDB_ACCESSOR(std::optional<ui64>, GuaranteeIndexationStartBytesLimit, 0); YDB_ACCESSOR(std::optional<TDuration>, OptimizerFreshnessCheckDuration, TDuration::Zero()); + EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force; protected: virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) override; virtual bool DoOnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) override; @@ -22,8 +23,15 @@ protected: virtual TDuration GetOptimizerFreshnessCheckDuration(const TDuration defaultValue) const override { return OptimizerFreshnessCheckDuration.value_or(defaultValue); } + virtual EOptimizerCompactionWeightControl GetCompactionControl() const override { + return CompactionControl; + } public: + void SetCompactionControl(const EOptimizerCompactionWeightControl value) { + CompactionControl = value; + } + bool HasPKSortingOnly() const; bool HasCompactions() const { return Compactions.Val(); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index ec36f237ee0..28a7809389c 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2537,7 +2537,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { --planStep; --txId; - ui32 numRows = numWrites * (triggerPortionSize - overlapSize) + overlapSize; + const ui32 fullNumRows = numWrites * (triggerPortionSize - overlapSize) + overlapSize; for (ui32 i = 0; i < 2; ++i) { { @@ -2570,11 +2570,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { if (testBlobOptions.SameValueColumns.contains("timestamp")) { UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message")); - UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "message")); + UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "message")); } else { UNIT_ASSERT(isStrPk0 - ? DataHas<std::string>(readData, schema, { 0, numRows }, true, "timestamp") - : DataHas(readData, schema, { 0, numRows }, true, "timestamp")); + ? DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "timestamp") + : DataHas(readData, schema, { 0, fullNumRows}, true, "timestamp")); } } @@ -2584,9 +2584,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { std::vector<ui32> val9999 = { 99999 }; std::vector<ui32> val1M = { 1000000000 }; std::vector<ui32> val1M_1 = { 1000000001 }; - std::vector<ui32> valNumRows = { numRows }; - std::vector<ui32> valNumRows_1 = { numRows - 1 }; - std::vector<ui32> valNumRows_2 = { numRows - 2 }; + std::vector<ui32> valNumRows = {fullNumRows}; + std::vector<ui32> valNumRows_1 = {fullNumRows - 1 }; + std::vector<ui32> valNumRows_2 = {fullNumRows - 2 }; { UNIT_ASSERT(table.Pk.size() >= 2); @@ -2598,9 +2598,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { val9999 = { sameValue, 99999 }; val1M = { sameValue, 1000000000 }; val1M_1 = { sameValue, 1000000001 }; - valNumRows = { sameValue, numRows }; - valNumRows_1 = { sameValue, numRows - 1 }; - valNumRows_2 = { sameValue, numRows - 2 }; + valNumRows = { sameValue, fullNumRows}; + valNumRows_1 = { sameValue, fullNumRows - 1 }; + valNumRows_2 = { sameValue, fullNumRows - 2 }; } using TBorder = TTabletReadPredicateTest::TBorder; @@ -2632,7 +2632,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } - { // Get index stats ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0); auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle); @@ -2649,6 +2648,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 sumCompactedBytes = 0; ui64 sumCompactedRows = 0; + ui64 sumInsertedBytes = 0; + ui64 sumInsertedRows = 0; for (ui32 i = 0; i < batchStats->num_rows(); ++i) { auto paths = batchStats->GetColumnByName("PathId"); auto kinds = batchStats->GetColumnByName("Kind"); @@ -2671,16 +2672,22 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { sumCompactedRows += numRows; //UNIT_ASSERT(numRawBytes > numBytes); } + if (kind == (ui32)NOlap::NPortion::EProduced::INSERTED) { + sumInsertedBytes += numBytes; + sumInsertedRows += numRows; + //UNIT_ASSERT(numRawBytes > numBytes); + } } else { UNIT_ASSERT_VALUES_EQUAL(numRows, 0); UNIT_ASSERT_VALUES_EQUAL(numBytes, 0); UNIT_ASSERT_VALUES_EQUAL(numRawBytes, 0); } } - const ui64 fullSize = (triggerPortionSize - overlapSize) * numWrites + overlapSize; - Cerr << "compacted=" << sumCompactedRows << ";expected=" << fullSize << ";" << Endl; - UNIT_ASSERT(sumCompactedRows == fullSize); - UNIT_ASSERT(sumCompactedBytes > sumCompactedRows); + Cerr << "compacted=" << sumCompactedRows << ";inserted=" << sumInsertedRows << ";expected=" << fullNumRows << ";" << Endl; + RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + UNIT_ASSERT(sumCompactedRows + sumInsertedRows == fullNumRows); + UNIT_ASSERT(sumCompactedRows > sumInsertedRows); + UNIT_ASSERT(sumCompactedBytes > sumInsertedBytes); } } |