aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorstanly <stanly@yandex-team.com>2023-05-19 15:55:20 +0300
committerstanly <stanly@yandex-team.com>2023-05-19 15:55:20 +0300
commitefd64e4742f72120932dbe9aca8837dcf1759f51 (patch)
tree03e5bfb80610d064e5f19e5d6759e1a5204924b6
parentf6ea2dff0afb9552f0f687497d5fad3b932ce0d1 (diff)
downloadydb-efd64e4742f72120932dbe9aca8837dcf1759f51.tar.gz
set count of compaction workers to 2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp13
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp6
3 files changed, 15 insertions, 6 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 392f3343693..8ebf8d9a29d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -44,7 +44,7 @@ IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui6
IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
struct TSettings {
- static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 1;
+ static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 2;
static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index cba682b3eca..78cd9eee330 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -524,6 +524,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std:
if (changes->CompactionInfo->InGranule) {
const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot);
if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, Limits, completedSnap, changes->MergeBorders)) {
+ // Return granule to Compation list. This is equal to single compaction worker behaviour.
+ CompactionGranules.insert(granule);
return {};
}
} else {
@@ -796,6 +798,12 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE
}
if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only
+ if (changes->IsCompaction()) {
+ // Return granule to Compation list. This is equal to single compaction worker behaviour.
+ for (const auto& portionInfo : changes->SwitchedPortions) {
+ CompactionGranules.insert(portionInfo.Granule());
+ }
+ }
return false;
}
bool ok = ApplyChanges(db, *changes, snapshot, true);
@@ -1337,9 +1345,8 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact
ui64 granule = 0;
bool inGranule = true;
- auto it = CompactionGranules.upper_bound(lastCompactedGranule);
- while (!CompactionGranules.empty()) {
+ for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty();) {
// Start from the beggining if the end is reached.
if (it == CompactionGranules.end()) {
it = CompactionGranules.begin();
@@ -1353,9 +1360,11 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact
if (NeedSplit(gi->second->Portions, Limits, inserted)) {
inGranule = false;
granule = *it;
+ CompactionGranules.erase(it);
break;
} else if (inserted) {
granule = *it;
+ CompactionGranules.erase(it);
break;
}
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 3c355e76833..1438eba2784 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1142,7 +1142,7 @@ void TestCompactionInGranuleImpl(bool reboots,
UNIT_ASSERT(readStats.GetIndexBatches() > 0);
UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0);
UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
- UNIT_ASSERT(readStats.GetIndexPortions() <= 2 * TSettings::MAX_ACTIVE_COMPACTIONS); // got compaction
+ UNIT_ASSERT(readStats.GetIndexPortions() <= 2); // got compaction
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
@@ -2210,7 +2210,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(readStats.GetIndexBatches() > 0);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3 * TSettings::MAX_ACTIVE_COMPACTIONS); // got 2 split compactions per worker
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x);
}
@@ -2603,7 +2603,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
--planStep;
--txId;
- UNIT_ASSERT_VALUES_EQUAL(compactionsHappened, 3 * TSettings::MAX_ACTIVE_COMPACTIONS); // we catch it three times per action
+ UNIT_ASSERT_VALUES_EQUAL(compactionsHappened, 3); // we catch it three times per action
ui64 previousCompactionsHappened = compactionsHappened;
ui64 previousCleanupsHappened = cleanupsHappened;