aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-23 16:46:11 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-23 17:38:50 +0300
commitcb1f1ff293a64febd278ea89e86ccb57c9adfdb3 (patch)
tree0f8dba2a901d1493a295aa0a5cab0a9371c6fff4
parent08405a23b9b6caeabe8d8e19f9d850b0fe4fe52e (diff)
downloadydb-cb1f1ff293a64febd278ea89e86ccb57c9adfdb3.tar.gz
KIKIMR-19211: use buckets planner
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h27
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ya.make2
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp3
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h9
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h8
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp37
11 files changed, 73 insertions, 25 deletions
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 2126a0042fd..2657daf200b 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -1,8 +1,6 @@
#include "granule.h"
#include "storage.h"
#include <library/cpp/actors/core/log.h>
-#include "optimizer/intervals/optimizer.h"
-#include "optimizer/levels/optimizer.h"
#include "optimizer/lbuckets/optimizer.h"
namespace NKikimr::NOlap {
@@ -167,7 +165,7 @@ TGranuleMeta::TGranuleMeta(const ui64 pathId, std::shared_ptr<TGranulesStorage>
, PortionInfoGuard(Owner->GetCounters().BuildPortionBlobsGuard())
{
Y_ABORT_UNLESS(Owner);
- OptimizerPlanner = std::make_shared<NStorageOptimizer::NLevels::TLevelsOptimizerPlanner>(PathId, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey());
+ OptimizerPlanner = std::make_shared<NStorageOptimizer::NBuckets::TOptimizerPlanner>(PathId, owner->GetStoragesManager(), versionedIndex.GetLastSchema()->GetIndexInfo().GetReplaceKey());
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
index 330a426a53b..7f8585834a4 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
@@ -198,7 +198,7 @@ private:
return true;
}
public:
- void Validate(const std::shared_ptr<TPortionInfo>& portion) const {
+ bool Validate(const std::shared_ptr<TPortionInfo>& portion) const {
if (portion) {
AFL_VERIFY(!PreActuals.contains(portion->GetPortionId()));
AFL_VERIFY(!Actuals.contains(portion->GetPortionId()));
@@ -235,6 +235,7 @@ public:
AFL_VERIFY(!f.second.contains(i.first));
}
}
+ return true;
}
bool IsEmpty() const {
@@ -519,6 +520,14 @@ public:
const ui64 count = BucketInfo.GetCount() + ((mainPortion && !isFinal) ? 1 : 0);
const ui64 recordsCount = BucketInfo.GetRecordsCount() + ((mainPortion && !isFinal) ? mainPortion->GetRecordsCount() : 0);
const ui64 sumBytes = BucketInfo.GetBytes() + ((mainPortion && !isFinal) ? mainPortion->GetBlobBytes() : 0);
+ if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Disable) {
+ return 0;
+ }
+ const ui64 weight = (10000000000.0 * count - sumBytes) * (isFinal ? 1 : 10);
+ if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Force) {
+ return (count > 1) ? weight : 0;
+ }
+
if (count > 1 && (sumBytes > 32 * 1024 * 1024 || !isFinal || count > 100 || recordsCount > 100000)) {
return (10000000000.0 * count - sumBytes) * (isFinal ? 1 : 10);
} else {
@@ -562,8 +571,8 @@ private:
}
}
- void Validate() const {
- Others.Validate(MainPortion);
+ bool Validate() const {
+ return Others.Validate(MainPortion);
}
public:
class TModificationGuard: TNonCopyable {
@@ -577,11 +586,11 @@ public:
, IsEmptyOthers(Owner.Others.ActualsEmpty())
, HasNextBorder(Owner.NextBorder)
{
-// Owner.Validate();
+ AFL_VERIFY_DEBUG(Owner.Validate());
}
~TModificationGuard() {
-// Owner.Validate();
+ AFL_VERIFY_DEBUG(Owner.Validate());
if (!Owner.MainPortion) {
return;
}
@@ -957,7 +966,7 @@ public:
} else {
if (itFrom == Buckets.end()) {
const TDuration freshness = now - TInstant::MilliSeconds(portion->RecordSnapshotMax().GetPlanStep());
- if (freshness < GetCommonFreshnessCheckDuration()) {
+ if (freshness < GetCommonFreshnessCheckDuration() || portion->GetMeta().GetProduced() == NPortion::EProduced::INSERTED) {
AddOther(portion, now);
return;
}
@@ -1017,7 +1026,11 @@ protected:
Buckets.Actualize(currentInstant);
}
virtual TOptimizationPriority DoGetUsefulMetric() const override {
- return TOptimizationPriority::Critical(Buckets.GetWeight());
+ if (Buckets.GetWeight()) {
+ return TOptimizationPriority::Critical(Buckets.GetWeight());
+ } else {
+ return TOptimizationPriority::Zero();
+ }
}
virtual TString DoDebugString() const override {
return Buckets.DebugString();
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt
index 5810287df79..b2ebd6d0052 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt
@@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
yql-sql-pg_dummy
core-arrow_kernels-request
core-testlib-default
+ columnshard-hooks-abstract
+ columnshard-hooks-testing
json2_udf
)
target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt
index 3637b62e837..c60d79794a3 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt
@@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
yql-sql-pg_dummy
core-arrow_kernels-request
core-testlib-default
+ columnshard-hooks-abstract
+ columnshard-hooks-testing
json2_udf
)
target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt
index 62b2cbc7422..9748cc589af 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt
@@ -29,6 +29,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
yql-sql-pg_dummy
core-arrow_kernels-request
core-testlib-default
+ columnshard-hooks-abstract
+ columnshard-hooks-testing
json2_udf
)
target_link_options(ydb-core-tx-columnshard-engines-ut PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt
index fc4d07a486b..687550a8606 100644
--- a/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/ut/CMakeLists.windows-x86_64.txt
@@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-columnshard-engines-ut PUBLIC
yql-sql-pg_dummy
core-arrow_kernels-request
core-testlib-default
+ columnshard-hooks-abstract
+ columnshard-hooks-testing
json2_udf
)
target_sources(ydb-core-tx-columnshard-engines-ut PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make
index 04f31654b96..13fc351c5a5 100644
--- a/ydb/core/tx/columnshard/engines/ut/ya.make
+++ b/ydb/core/tx/columnshard/engines/ut/ya.make
@@ -23,6 +23,8 @@ PEERDIR(
ydb/library/yql/sql/pg_dummy
ydb/library/yql/core/arrow_kernels/request
ydb/core/testlib/default
+ ydb/core/tx/columnshard/hooks/abstract
+ ydb/core/tx/columnshard/hooks/testing
ydb/library/yql/udfs/common/json2
)
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 96e0e925133..1e3d7f712ff 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <ydb/core/tx/columnshard/blobs_action/bs/storage.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
namespace NKikimr {
@@ -14,6 +15,7 @@ using namespace NOlap;
namespace NTypeIds = NScheme::NTypeIds;
using TTypeId = NScheme::TTypeId;
using TTypeInfo = NScheme::TTypeInfo;
+using TDefaultTestsController = NKikimr::NYDBTest::NColumnShard::TController;
namespace {
@@ -572,6 +574,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
Y_UNIT_TEST(IndexWriteOverload) {
TTestDbWrapper db;
+ auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);;
ui64 pathId = 1;
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 093ccd3031e..4339a6d1a61 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -25,6 +25,12 @@ class RecordBatch;
namespace NKikimr::NYDBTest {
+enum class EOptimizerCompactionWeightControl {
+ Disable,
+ Default,
+ Force
+};
+
class ICSController {
private:
YDB_READONLY(TAtomicCounter, OnSortingPolicyCounter, 0);
@@ -64,6 +70,9 @@ public:
bool OnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) {
return DoOnStartCompaction(changes);
}
+ virtual EOptimizerCompactionWeightControl GetCompactionControl() const {
+ return EOptimizerCompactionWeightControl::Force;
+ }
virtual TDuration GetTTLDefaultWaitingDuration(const TDuration defaultValue) const {
return defaultValue;
}
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index 41585f2bb66..60160940fe4 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -10,6 +10,7 @@ private:
YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero());
YDB_ACCESSOR(std::optional<ui64>, GuaranteeIndexationStartBytesLimit, 0);
YDB_ACCESSOR(std::optional<TDuration>, OptimizerFreshnessCheckDuration, TDuration::Zero());
+ EOptimizerCompactionWeightControl CompactionControl = EOptimizerCompactionWeightControl::Force;
protected:
virtual bool DoOnAfterFilterAssembling(const std::shared_ptr<arrow::RecordBatch>& batch) override;
virtual bool DoOnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) override;
@@ -22,8 +23,15 @@ protected:
virtual TDuration GetOptimizerFreshnessCheckDuration(const TDuration defaultValue) const override {
return OptimizerFreshnessCheckDuration.value_or(defaultValue);
}
+ virtual EOptimizerCompactionWeightControl GetCompactionControl() const override {
+ return CompactionControl;
+ }
public:
+ void SetCompactionControl(const EOptimizerCompactionWeightControl value) {
+ CompactionControl = value;
+ }
+
bool HasPKSortingOnly() const;
bool HasCompactions() const {
return Compactions.Val();
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index ec36f237ee0..28a7809389c 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2537,7 +2537,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
--planStep;
--txId;
- ui32 numRows = numWrites * (triggerPortionSize - overlapSize) + overlapSize;
+ const ui32 fullNumRows = numWrites * (triggerPortionSize - overlapSize) + overlapSize;
for (ui32 i = 0; i < 2; ++i) {
{
@@ -2570,11 +2570,11 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
if (testBlobOptions.SameValueColumns.contains("timestamp")) {
UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message"));
- UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, numRows }, true, "message"));
+ UNIT_ASSERT(DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "message"));
} else {
UNIT_ASSERT(isStrPk0
- ? DataHas<std::string>(readData, schema, { 0, numRows }, true, "timestamp")
- : DataHas(readData, schema, { 0, numRows }, true, "timestamp"));
+ ? DataHas<std::string>(readData, schema, { 0, fullNumRows}, true, "timestamp")
+ : DataHas(readData, schema, { 0, fullNumRows}, true, "timestamp"));
}
}
@@ -2584,9 +2584,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
std::vector<ui32> val9999 = { 99999 };
std::vector<ui32> val1M = { 1000000000 };
std::vector<ui32> val1M_1 = { 1000000001 };
- std::vector<ui32> valNumRows = { numRows };
- std::vector<ui32> valNumRows_1 = { numRows - 1 };
- std::vector<ui32> valNumRows_2 = { numRows - 2 };
+ std::vector<ui32> valNumRows = {fullNumRows};
+ std::vector<ui32> valNumRows_1 = {fullNumRows - 1 };
+ std::vector<ui32> valNumRows_2 = {fullNumRows - 2 };
{
UNIT_ASSERT(table.Pk.size() >= 2);
@@ -2598,9 +2598,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
val9999 = { sameValue, 99999 };
val1M = { sameValue, 1000000000 };
val1M_1 = { sameValue, 1000000001 };
- valNumRows = { sameValue, numRows };
- valNumRows_1 = { sameValue, numRows - 1 };
- valNumRows_2 = { sameValue, numRows - 2 };
+ valNumRows = { sameValue, fullNumRows};
+ valNumRows_1 = { sameValue, fullNumRows - 1 };
+ valNumRows_2 = { sameValue, fullNumRows - 2 };
}
using TBorder = TTabletReadPredicateTest::TBorder;
@@ -2632,7 +2632,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
-
{ // Get index stats
ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0);
auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle);
@@ -2649,6 +2648,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 sumCompactedBytes = 0;
ui64 sumCompactedRows = 0;
+ ui64 sumInsertedBytes = 0;
+ ui64 sumInsertedRows = 0;
for (ui32 i = 0; i < batchStats->num_rows(); ++i) {
auto paths = batchStats->GetColumnByName("PathId");
auto kinds = batchStats->GetColumnByName("Kind");
@@ -2671,16 +2672,22 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
sumCompactedRows += numRows;
//UNIT_ASSERT(numRawBytes > numBytes);
}
+ if (kind == (ui32)NOlap::NPortion::EProduced::INSERTED) {
+ sumInsertedBytes += numBytes;
+ sumInsertedRows += numRows;
+ //UNIT_ASSERT(numRawBytes > numBytes);
+ }
} else {
UNIT_ASSERT_VALUES_EQUAL(numRows, 0);
UNIT_ASSERT_VALUES_EQUAL(numBytes, 0);
UNIT_ASSERT_VALUES_EQUAL(numRawBytes, 0);
}
}
- const ui64 fullSize = (triggerPortionSize - overlapSize) * numWrites + overlapSize;
- Cerr << "compacted=" << sumCompactedRows << ";expected=" << fullSize << ";" << Endl;
- UNIT_ASSERT(sumCompactedRows == fullSize);
- UNIT_ASSERT(sumCompactedBytes > sumCompactedRows);
+ Cerr << "compacted=" << sumCompactedRows << ";inserted=" << sumInsertedRows << ";expected=" << fullNumRows << ";" << Endl;
+ RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
+ UNIT_ASSERT(sumCompactedRows + sumInsertedRows == fullNumRows);
+ UNIT_ASSERT(sumCompactedRows > sumInsertedRows);
+ UNIT_ASSERT(sumCompactedBytes > sumInsertedBytes);
}
}