diff options
author | stanly <stanly@yandex-team.com> | 2023-05-19 15:55:20 +0300 |
---|---|---|
committer | stanly <stanly@yandex-team.com> | 2023-05-19 15:55:20 +0300 |
commit | efd64e4742f72120932dbe9aca8837dcf1759f51 (patch) | |
tree | 03e5bfb80610d064e5f19e5d6759e1a5204924b6 | |
parent | f6ea2dff0afb9552f0f687497d5fad3b932ce0d1 (diff) | |
download | ydb-efd64e4742f72120932dbe9aca8837dcf1759f51.tar.gz |
set count of compaction workers to 2
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/column_engine_logs.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 6 |
3 files changed, 15 insertions, 6 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 392f3343693..8ebf8d9a29d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -44,7 +44,7 @@ IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui6 IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); struct TSettings { - static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 1; + static constexpr ui32 MAX_ACTIVE_COMPACTIONS = 2; static constexpr ui32 MAX_INDEXATIONS_TO_SKIP = 16; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index cba682b3eca..78cd9eee330 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -524,6 +524,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(std: if (changes->CompactionInfo->InGranule) { const TSnapshot completedSnap = std::max(LastSnapshot, outdatedSnapshot); if (!InitInGranuleMerge(changes->SrcGranule->Mark, changes->SwitchedPortions, Limits, completedSnap, changes->MergeBorders)) { + // Return granule to Compation list. This is equal to single compaction worker behaviour. + CompactionGranules.insert(granule); return {}; } } else { @@ -796,6 +798,12 @@ bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnE } if (!ApplyChanges(db, *changes, snapshot, false)) { // validate only + if (changes->IsCompaction()) { + // Return granule to Compation list. This is equal to single compaction worker behaviour. + for (const auto& portionInfo : changes->SwitchedPortions) { + CompactionGranules.insert(portionInfo.Granule()); + } + } return false; } bool ok = ApplyChanges(db, *changes, snapshot, true); @@ -1337,9 +1345,8 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact ui64 granule = 0; bool inGranule = true; - auto it = CompactionGranules.upper_bound(lastCompactedGranule); - while (!CompactionGranules.empty()) { + for (auto it = CompactionGranules.upper_bound(lastCompactedGranule); !CompactionGranules.empty();) { // Start from the beggining if the end is reached. if (it == CompactionGranules.end()) { it = CompactionGranules.begin(); @@ -1353,9 +1360,11 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact if (NeedSplit(gi->second->Portions, Limits, inserted)) { inGranule = false; granule = *it; + CompactionGranules.erase(it); break; } else if (inserted) { granule = *it; + CompactionGranules.erase(it); break; } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 3c355e76833..1438eba2784 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1142,7 +1142,7 @@ void TestCompactionInGranuleImpl(bool reboots, UNIT_ASSERT(readStats.GetIndexBatches() > 0); UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" - UNIT_ASSERT(readStats.GetIndexPortions() <= 2 * TSettings::MAX_ACTIVE_COMPACTIONS); // got compaction + UNIT_ASSERT(readStats.GetIndexPortions() <= 2); // got compaction RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } @@ -2210,7 +2210,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(readStats.GetIndexBatches() > 0); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" - UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3 * TSettings::MAX_ACTIVE_COMPACTIONS); // got 2 split compactions per worker + UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x); } @@ -2603,7 +2603,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { --planStep; --txId; - UNIT_ASSERT_VALUES_EQUAL(compactionsHappened, 3 * TSettings::MAX_ACTIVE_COMPACTIONS); // we catch it three times per action + UNIT_ASSERT_VALUES_EQUAL(compactionsHappened, 3); // we catch it three times per action ui64 previousCompactionsHappened = compactionsHappened; ui64 previousCleanupsHappened = cleanupsHappened; |