diff options
| author | Evgeniy Ivanov <[email protected]> | 2022-03-31 22:04:30 +0300 |
|---|---|---|
| committer | Evgeniy Ivanov <[email protected]> | 2022-03-31 22:04:30 +0300 |
| commit | d2fa7a39fd858307d4d5a6e2d548c4a6ee4f496f (patch) | |
| tree | 58c01c53ec453295133c94636a8808d77af63a81 | |
| parent | fa4b82d3614d68d8c7ede2bf4f58ffa9f9487784 (diff) | |
KIKIMR-9748: don't background compact neither source nor dst copy tables
ref:43925608e7050d2b17a305e8335b75095ed42a22
25 files changed, 461 insertions, 119 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index db413eaceab..26e35e98c5b 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -109,6 +109,7 @@ enum ECumulativeCounters { COUNTER_TX_BACKGROUND_COMPACTION_FAILED_BORROWED = 83 [(CounterOpts) = {Name: "TxCompactTableFailedBorrowed"}]; COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84 [(CounterOpts) = {Name: "TxCompactTableFailedStart"}]; COUNTER_FULL_COMPACTION_DONE = 85 [(CounterOpts) = {Name: "FullCompactionCount"}]; + COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED = 86 [(CounterOpts) = {Name: "TxCompactTableFailedLoaned"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 70378fa470e..7484b511b88 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -154,6 +154,7 @@ enum ESimpleCounters { COUNTER_COMPACTION_QUEUE_RUNNING = 125 [(CounterOpts) = {Name: "BackgroundCompactionQueueRunning"}]; COUNTER_COMPACTION_QUEUE_WAITING_REPEAT = 126 [(CounterOpts) = {Name: "BackgroundCompactionQueueWaitingRepeat"}]; COUNTER_SHARDS_WITH_BORROWED_DATA = 127 [(CounterOpts) = {Name: "TableShardsWithBorrowedData"}]; + COUNTER_SHARDS_WITH_LOANED_DATA = 128 [(CounterOpts) = {Name: "TableShardsWithLoanedData"}]; } enum ECumulativeCounters { @@ -244,6 +245,7 @@ enum ECumulativeCounters { COUNTER_BACKGROUND_COMPACTION_BORROWED = 73 [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}]; COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74 [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}]; + COUNTER_BACKGROUND_COMPACTION_LOANED = 75 [(CounterOpts) = {Name: "BackgroundCompactionLoaned"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/table_stats.proto b/ydb/core/protos/table_stats.proto index a555e0b659d..a2a22e8ff54 100644 --- a/ydb/core/protos/table_stats.proto +++ b/ydb/core/protos/table_stats.proto @@ -43,4 +43,7 @@ message TTableStats { // seconds since epoch optional uint64 LastFullCompactionTs = 28; + + // i.e. this shard lent to other shards + optional bool HasLoanedParts = 29; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 68c92ea5aad..e197ecf4662 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1400,7 +1400,7 @@ message TEvKqpScan { message TEvCompactTable { optional NKikimrProto.TPathID PathId = 1; - optional bool CompactBorrowed = 2; + optional bool CompactBorrowed = 2; // i.e. taken optional bool CompactSinglePartedShards = 3; } @@ -1410,6 +1410,7 @@ message TEvCompactTableResult { NOT_NEEDED = 1; FAILED = 2; BORROWED = 3; + LOANED = 4; }; optional uint64 TabletId = 1; @@ -1427,6 +1428,7 @@ message TEvGetCompactTableStats { message TEvGetCompactTableStatsResult { optional uint64 BackgroundCompactionRequests = 1; + optional uint64 BackgroundCompactionCount = 2; } // TEvRead is used to request multiple queries from a shard and diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 7f8c5550ad4..91f3a137bc1 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -3798,6 +3798,12 @@ THashMap<TLogoBlobID, TVector<ui64>> TExecutor::GetBorrowedParts() const { return { }; } +bool TExecutor::HasLoanedParts() const { + if (BorrowLogic) + return BorrowLogic->HasLoanedParts(); + return false; +} + const TExecutorStats& TExecutor::GetStats() const { return *Stats; } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 3a61e5385dd..9e72b436cfc 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -645,6 +645,7 @@ public: void SendUserAuxUpdateToFollowers(TString upd, const TActorContext &ctx) override; THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const override; + bool HasLoanedParts() const override; const TExecutorStats& GetStats() const override; NMetrics::TResourceMetrics* GetResourceMetrics() const override; diff --git a/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp b/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp index bd52264b10e..b0371b2701c 100644 --- a/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp +++ b/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp @@ -593,4 +593,13 @@ THashMap<TLogoBlobID, TVector<ui64>> TExecutorBorrowLogic::GetBorrowedParts() co return result; } +bool TExecutorBorrowLogic::HasLoanedParts() const { + for (const auto &xpair : BorrowedInfo) { + if (xpair.second.BorrowInfo.FullBorrow) { + return true; + } + } + return false; +} + }} diff --git a/ydb/core/tablet_flat/flat_executor_borrowlogic.h b/ydb/core/tablet_flat/flat_executor_borrowlogic.h index 5d990b49dc0..31021454854 100644 --- a/ydb/core/tablet_flat/flat_executor_borrowlogic.h +++ b/ydb/core/tablet_flat/flat_executor_borrowlogic.h @@ -168,6 +168,9 @@ public: // for cleanup THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const; + + // i.e. parts we own, but loaned to others + bool HasLoanedParts() const; }; }} diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index babe00d8635..f628f6cd42b 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -540,6 +540,7 @@ namespace NFlatExecutorSetup { // Returns parts owned by this tablet and borrowed by other tablets virtual THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const = 0; + virtual bool HasLoanedParts() const = 0; // This method lets executor know about new yellow channels virtual void OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) = 0; diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp index 8d4c2b84eb8..317767d7368 100644 --- a/ydb/core/tx/datashard/datashard__compaction.cpp +++ b/ydb/core/tx/datashard/datashard__compaction.cpp @@ -66,7 +66,6 @@ public: const TUserTable& tableInfo = *it->second; const auto localTid = tableInfo.LocalTid; - // TODO: consider using metrics instead ++tableInfo.Stats.BackgroundCompactionRequests; auto stats = txc.DB.GetCompactionStats(localTid); @@ -82,11 +81,12 @@ public: if (hasBorrowed && !record.GetCompactBorrowed()) { // normally we should not receive requests to compact in this case + // but in some rare cases like schemeshard restart we can LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Background compaction of tablet# " << Self->TabletID() << " of path# " << pathId << ", requested from# " << Ev->Sender - << " contains borrowed data, failed"); + << " contains borrowed parts, failed"); Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION_FAILED_BORROWED); @@ -98,6 +98,25 @@ public: return true; } + if (Self->Executor()->HasLoanedParts()) { + // normally we should not receive requests to compact in this case + // but in some rare cases like schemeshard restart we can + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Background compaction of tablet# " << Self->TabletID() + << " of path# " << pathId + << ", requested from# " << Ev->Sender + << " contains loaned parts, failed"); + + Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED); + + auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>( + Self->TabletID(), + pathId, + NKikimrTxDataShard::TEvCompactTableResult::LOANED); + ctx.Send(Ev->Sender, std::move(response)); + return true; + } + bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0; bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0; if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) { @@ -133,6 +152,7 @@ public: Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION); Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, pathId, Ev->Sender)); + ++tableInfo.Stats.BackgroundCompactionCount; } else { // compaction failed, for now we don't care Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START); @@ -253,6 +273,7 @@ void TDataShard::Handle(TEvDataShard::TEvGetCompactTableStats::TPtr& ev, const T if (it != TableInfos.end()) { const TUserTable& tableInfo = *it->second; response->Record.SetBackgroundCompactionRequests(tableInfo.Stats.BackgroundCompactionRequests); + response->Record.SetBackgroundCompactionCount(tableInfo.Stats.BackgroundCompactionCount); } ctx.Send(ev->Sender, std::move(response)); diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 3ee32261af4..c618255c7f0 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -126,6 +126,7 @@ public: Result->Record.MutableTableStats()->SetPartCount(tableInfo.Stats.PartCount); Result->Record.MutableTableStats()->SetSearchHeight(tableInfo.Stats.SearchHeight); Result->Record.MutableTableStats()->SetLastFullCompactionTs(tableInfo.Stats.LastFullCompaction.Seconds()); + Result->Record.MutableTableStats()->SetHasLoanedParts(Self->Executor()->HasLoanedParts()); Result->Record.SetShardState(Self->State); for (const auto& pi : tableInfo.Stats.PartOwners) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 9184a788727..4373e74354f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2500,6 +2500,7 @@ protected: ev->Record.MutableTableStats()->SetPartCount(ti.Stats.PartCount); ev->Record.MutableTableStats()->SetSearchHeight(ti.Stats.SearchHeight); ev->Record.MutableTableStats()->SetLastFullCompactionTs(ti.Stats.LastFullCompaction.Seconds()); + ev->Record.MutableTableStats()->SetHasLoanedParts(Executor()->HasLoanedParts()); if (!ti.Stats.PartOwners.contains(TabletID())) { ev->Record.AddUserTablePartOwners(TabletID()); diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 4cc7845d8b4..2324be8b5d9 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -318,6 +318,7 @@ struct TUserTable : public TThrRefBase { ui64 DataSizeResolution = 0; ui64 RowCountResolution = 0; ui64 BackgroundCompactionRequests = 0; + ui64 BackgroundCompactionCount = 0; NTable::TKeyAccessSample AccessStats; void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) { diff --git a/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp b/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp index 758508bd29e..22bcd5977a8 100644 --- a/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp +++ b/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp @@ -70,22 +70,68 @@ Y_UNIT_TEST_SUITE(DataShardBackgroundCompaction) { ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 100), (3, 300), (5, 500);"); auto shards = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1UL); + const auto originalShard = shards.at(0); SetSplitMergePartCountLimit(&runtime, -1); ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards.at(0), 3); WaitTxNotification(server, sender, txId); - auto [tables, ownerId] = GetTables(server, shards.at(0)); - shards = GetTableShards(server, sender, "/Root/table-1"); UNIT_ASSERT(shards.size() > 1); for (auto shard: shards) { + auto [tables, ownerId] = GetTables(server, shard); auto compactionResult = CompactTable(server, tables["table-1"], shard, ownerId); + UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::BORROWED); + } + + { + auto [tables, ownerId] = GetTables(server, originalShard); + // try to compact original table (should be inactive now) + auto compactionResult = CompactTable(server, tables["table-1"], originalShard, ownerId); UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::FAILED); } } + Y_UNIT_TEST(ShouldNotCompactWhenCopyTable) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 100), (3, 300), (5, 500);"); + + auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1"); + WaitTxNotification(server, sender, txIdCopy); + + { + auto shards = GetTableShards(server, sender, "/Root/table-1"); + + auto [tables, ownerId] = GetTables(server, shards.at(0)); + auto compactionResult = CompactTable(server, tables["table-1"], shards.at(0), ownerId); + UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::LOANED); + } + + { + auto shards = GetTableShards(server, sender, "/Root/table-2"); + + auto [tables, ownerId] = GetTables(server, shards.at(0)); + auto compactionResult = CompactTable(server, tables["table-2"], shards.at(0), ownerId); + UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::BORROWED); + } + } + Y_UNIT_TEST(ShouldNotCompactEmptyTable) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index 3ed441d3963..820d7f2d2c1 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1203,6 +1203,31 @@ void CreateShardedTable(Tests::TServer::TPtr server, CreateShardedTable(server, sender, root, name, opts); } +ui64 AsyncCreateCopyTable( + Tests::TServer::TPtr server, + TActorId sender, + const TString &root, + const TString &name, + const TString &from) +{ + auto &runtime = *server->GetRuntime(); + + // Create table with four shards. + auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); + auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable); + tx.SetWorkingDir(root); + auto &desc = *tx.MutableCreateTable(); + desc.SetName(name); + desc.SetCopyFromTable(from); + + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); + return ev->Get()->Record.GetTxId(); +} + NKikimrScheme::TEvDescribeSchemeResult DescribeTable(Tests::TServer::TPtr server, TActorId sender, const TString &path) diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 79794005400..7f903afe5f6 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -469,6 +469,12 @@ void CreateShardedTable(Tests::TServer::TPtr server, const NLocalDb::TCompactionPolicy* policy = nullptr, EShadowDataMode shadowData = EShadowDataMode::Default); +ui64 AsyncCreateCopyTable(Tests::TServer::TPtr server, + TActorId sender, + const TString &root, + const TString &name, + const TString &from); + TVector<ui64> GetTableShards(Tests::TServer::TPtr server, TActorId sender, const TString &path); diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index e7ca6bd002f..3a568664ae4 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -2519,33 +2519,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc, UseNe } } -namespace { - ui64 AsyncCreateCopyTable( - Tests::TServer::TPtr server, - TActorId sender, - const TString &root, - const TString &name, - const TString &from) - { - auto &runtime = *server->GetRuntime(); - - // Create table with four shards. - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable); - tx.SetWorkingDir(root); - auto &desc = *tx.MutableCreateTable(); - desc.SetName(name); - desc.SetCopyFromTable(from); - - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); - } -} - Y_UNIT_TEST_QUAD(TestCopyTableNoDeadlock, UseMvcc, UseNewEngine) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index 1ba33fc7d47..53be2907a8b 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -7,6 +7,8 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/util/operation_queue.h> +#include <library/cpp/actors/core/log.h> + // TODO: TOperationQueueWithTimer is a good candidate for core/util, but since // it uses actorlib_impl, which depends on core/util, it // can't be part of util. No other better place yet and since @@ -17,14 +19,14 @@ namespace NKikimr { // TODO: make part of util? namespace NOperationQueue { -template <typename T, typename TQueue, int Ev> +template <typename T, typename TQueue, int Ev, int LogServiceId> class TOperationQueueWithTimer - : public TActor<TOperationQueueWithTimer<T, TQueue, Ev>> + : public TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>> , public ITimer , public TOperationQueue<T, TQueue> { - using TThis = ::NKikimr::NOperationQueue::TOperationQueueWithTimer<T, TQueue, Ev>; - using TActorBase = TActor<TOperationQueueWithTimer<T, TQueue, Ev>>; + using TThis = ::NKikimr::NOperationQueue::TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>; + using TActorBase = TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>>; using TBase = TOperationQueue<T, TQueue>; struct TEvWakeupQueue : public TEventLocal<TEvWakeupQueue, Ev> { @@ -32,6 +34,7 @@ class TOperationQueueWithTimer }; private: + NKikimrServices::EServiceKikimr ServiceId = NKikimrServices::EServiceKikimr(LogServiceId); TActorId LongTimerId; TInstant When; @@ -67,22 +70,28 @@ private: this->Send(LongTimerId, new TEvents::TEvPoison); When = t; - LongTimerId = CreateLongTimer(t - Now(), + auto delta = t - this->Now(); + auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId()); + LongTimerId = CreateLongTimer(ctx, delta, new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue)); + + LOG_DEBUG_S(ctx, ServiceId, + "Operation queue set NextWakeup# " << When << ", delta# " << delta.Seconds() << " seconds"); } TInstant Now() override { return AppData()->TimeProvider->Now(); } - void HandleWakeup() { + void HandleWakeup(const TActorContext &ctx) { + LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup# " << this->Now()); TBase::Wakeup(); } STFUNC(StateWork) { Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { - cFunc(TEvWakeupQueue::EventType, HandleWakeup); + CFunc(TEvWakeupQueue::EventType, HandleWakeup); } } }; diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index 4d937c2ad44..702ace05dc7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -3,15 +3,16 @@ namespace NKikimr { namespace NSchemeShard { -NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardIdx& shardId) { +NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardCompactionInfo& info) { UpdateBackgroundCompactionQueueMetrics(); auto ctx = TActivationContext::ActorContextFor(SelfId()); - auto it = ShardInfos.find(shardId); + const auto& shardIdx = info.ShardIdx; + auto it = ShardInfos.find(shardIdx); if (it == ShardInfos.end()) { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " - "for background compaction# " << shardId + "for background compaction# " << shardIdx << " at schemeshard# " << TabletID()); return NOperationQueue::EStartStatus::EOperationRemove; @@ -22,6 +23,7 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction " "for pathId# " << pathId << ", datashard# " << datashardId + << ", compactionInfo# " << info << ", next wakeup# " << CompactionQueue->GetWakeupTime() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" @@ -42,17 +44,17 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha return NOperationQueue::EStartStatus::EOperationRunning; } -void TSchemeShard::OnBackgroundCompactionTimeout(const TShardIdx& shardId) { +void TSchemeShard::OnBackgroundCompactionTimeout(const TShardCompactionInfo& info) { UpdateBackgroundCompactionQueueMetrics(); - TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_TIMEOUT].Increment(1); auto ctx = TActivationContext::ActorContextFor(SelfId()); - auto it = ShardInfos.find(shardId); + const auto& shardIdx = info.ShardIdx; + auto it = ShardInfos.find(shardIdx); if (it == ShardInfos.end()) { LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " - "for timeout background compaction# " << shardId + "for timeout background compaction# " << shardIdx << " at schemeshard# " << TabletID()); return; } @@ -62,6 +64,7 @@ void TSchemeShard::OnBackgroundCompactionTimeout(const TShardIdx& shardId) { LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout " "for pathId# " << pathId << ", datashard# " << datashardId + << ", compactionInfo# " << info << ", next wakeup# " << CompactionQueue->GetWakeupTime() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" @@ -100,6 +103,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T } else { LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction " "for pathId# " << pathId << ", datashard# " << tabletId + << ", shardIdx# " << shardIdx << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() << ", next wakeup# " << CompactionQueue->GetWakeupTime() << ", rate# " << CompactionQueue->GetRate() @@ -126,6 +130,9 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T case NKikimrTxDataShard::TEvCompactTableResult::BORROWED: TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_BORROWED].Increment(1); break; + case NKikimrTxDataShard::TEvCompactTableResult::LOANED: + TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_LOANED].Increment(1); + break; } UpdateBackgroundCompactionQueueMetrics(); @@ -138,8 +145,30 @@ void TSchemeShard::EnqueueCompaction( if (!CompactionQueue) return; - if (stats.HasBorrowed) + auto ctx = TActivationContext::ActorContextFor(SelfId()); + + if (stats.HasBorrowedData) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "background compaction enqueue skipped shard# " << shardIdx + << " with borrowed parts at schemeshard " << TabletID()); return; + } + + if (stats.HasLoanedData) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "background compaction enqueue skipped shard# " << shardIdx + << " with loaned parts at schemeshard " << TabletID()); + return; + } + + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "background compaction enqueue shard# " << shardIdx + << " with partCount# " << stats.PartCount + << ", rowCount# " << stats.RowCount + << ", searchHeight# " << stats.SearchHeight + << ", memDataSize# " << stats.MemDataSize + << ", lastFullCompaction# " << TInstant::Seconds(stats.FullCompactionTs) + << " at schemeshard " << TabletID()); CompactionQueue->Enqueue(TShardCompactionInfo(shardIdx, stats)); UpdateBackgroundCompactionQueueMetrics(); @@ -152,11 +181,33 @@ void TSchemeShard::UpdateCompaction( if (!CompactionQueue) return; - if (newStats.HasBorrowed) { + auto ctx = TActivationContext::ActorContextFor(SelfId()); + + if (newStats.HasBorrowedData) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "background compaction update removed shard# " << shardIdx + << " with borrowed parts at schemeshard " << TabletID()); + RemoveCompaction(shardIdx); + return; + } + + if (newStats.HasLoanedData) { + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "background compaction update removed shard# " << shardIdx + << " with loaned parts at schemeshard " << TabletID()); RemoveCompaction(shardIdx); return; } + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "background compaction enqueue/update shard# " << shardIdx + << " with partCount# " << newStats.PartCount + << ", rowCount# " << newStats.RowCount + << ", searchHeight# " << newStats.SearchHeight + << ", memDataSize# " << newStats.MemDataSize + << ", lastFullCompaction# " << TInstant::Seconds(newStats.FullCompactionTs) + << " at schemeshard " << TabletID()); + TShardCompactionInfo info(shardIdx, newStats); if (!CompactionQueue->Update(info)) CompactionQueue->Enqueue(std::move(info)); @@ -191,12 +242,18 @@ void TSchemeShard::UpdateShardMetrics( const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats) { - if (newStats.HasBorrowed) + if (newStats.HasBorrowedData) ShardsWithBorrowed.insert(shardIdx); else ShardsWithBorrowed.erase(shardIdx); TabletCounters->Simple()[COUNTER_SHARDS_WITH_BORROWED_DATA].Set(ShardsWithBorrowed.size()); + if (newStats.HasLoanedData) + ShardsWithLoaned.insert(shardIdx); + else + ShardsWithLoaned.erase(shardIdx); + TabletCounters->Simple()[COUNTER_SHARDS_WITH_LOANED_DATA].Set(ShardsWithLoaned.size()); + THashMap<TShardIdx, TPartitionMetrics>::insert_ctx insertCtx; auto it = PartitionMetricsMap.find(shardIdx, insertCtx); if (it != PartitionMetricsMap.end()) { @@ -230,6 +287,9 @@ void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) { ShardsWithBorrowed.erase(shardIdx); TabletCounters->Simple()[COUNTER_SHARDS_WITH_BORROWED_DATA].Set(ShardsWithBorrowed.size()); + ShardsWithLoaned.erase(shardIdx); + TabletCounters->Simple()[COUNTER_SHARDS_WITH_LOANED_DATA].Set(ShardsWithLoaned.size()); + auto it = PartitionMetricsMap.find(shardIdx); if (it == PartitionMetricsMap.end()) return; diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index 28905d5b3d4..4ade8158a3a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -178,10 +178,11 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte newStats.FullCompactionTs = tableStats.GetLastFullCompactionTs(); newStats.MemDataSize = tableStats.GetInMemSize(); newStats.StartTime = TInstant::MilliSeconds(rec.GetStartTime()); + newStats.HasLoanedData = tableStats.GetHasLoanedParts(); for (ui64 tabletId : rec.GetUserTablePartOwners()) { newStats.PartOwners.insert(TTabletId(tabletId)); if (tabletId != rec.GetDatashardId()) { - newStats.HasBorrowed = true; + newStats.HasBorrowedData = true; } } for (ui64 tabletId : rec.GetSysTablesPartOwners()) { @@ -328,7 +329,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte } } - if (newStats.HasBorrowed) { + if (newStats.HasBorrowedData) { // We don't want to split shards that have borrow parts // We must ask them to compact first CompactEv.Reset(new TEvDataShard::TEvCompactBorrowed(tableId)); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index f0df5fa4542..8059d196d7f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -290,6 +290,7 @@ void TSchemeShard::Clear() { UpdateBackgroundCompactionQueueMetrics(); } ShardsWithBorrowed.clear(); + ShardsWithLoaned.clear(); PersQueueGroups.clear(); RtmrVolumes.clear(); SubDomains.clear(); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 07f58ad7d02..aebd6694934 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -78,7 +78,8 @@ private: using TCompactionQueue = NOperationQueue::TOperationQueueWithTimer< TShardCompactionInfo, TCompactionQueueImpl, - TEvPrivate::EvRunBackgroundCompaction>; + TEvPrivate::EvRunBackgroundCompaction, + NKikimrServices::FLAT_TX_SCHEMESHARD>; class TCompactionStarter : public TCompactionQueue::IStarter { public: @@ -87,11 +88,11 @@ private: { } NOperationQueue::EStartStatus StartOperation(const TShardCompactionInfo& info) override { - return Self->StartBackgroundCompaction(info.ShardIdx); + return Self->StartBackgroundCompaction(info); } void OnTimeout(const TShardCompactionInfo& info) override { - Self->OnBackgroundCompactionTimeout(info.ShardIdx); + Self->OnBackgroundCompactionTimeout(info); } private: @@ -205,7 +206,8 @@ public: TCompactionStarter CompactionStarter; TCompactionQueue* CompactionQueue = nullptr; - THashSet<TShardIdx> ShardsWithBorrowed; + THashSet<TShardIdx> ShardsWithBorrowed; // shards have parts from another shards + THashSet<TShardIdx> ShardsWithLoaned; // shards have parts loaned to another shards bool EnableBackgroundCompaction = false; bool EnableBackgroundCompactionServerless = false; @@ -645,8 +647,8 @@ public: void ShardRemoved(const TShardIdx& shardIdx); - NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardIdx& shardId); - void OnBackgroundCompactionTimeout(const TShardIdx& shardId); + NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardCompactionInfo& info); + void OnBackgroundCompactionTimeout(const TShardCompactionInfo& info); void UpdateBackgroundCompactionQueueMetrics(); struct TTxCleanDroppedSubDomains; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 332746dae08..67c209da1e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1441,7 +1441,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings, } // We don't want to merge shards that have borrowed non-compacted data - if (stats->HasBorrowed) + if (stats->HasBorrowedData) return false; bool canMerge = false; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 73dced02199..9175c1b86e3 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -247,7 +247,10 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> { ui32 ShardState = NKikimrTxDataShard::Unknown; // True when PartOwners has parts from other tablets - bool HasBorrowed = false; + bool HasBorrowedData = false; + + // True when lent parts to other tablets + bool HasLoanedData = false; // Tablet actor started at TInstant StartTime; diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index bcc38c2d8ac..1b05ad8d66a 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -46,6 +46,69 @@ std::pair<TTableInfoMap, ui64> GetTables( return std::make_pair(result, ownerId); } +struct TPathInfo { + ui64 OwnerId = TTestTxConfig::SchemeShard; + NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable; + TVector<ui64> Shards; +}; + +TPathInfo GetPathInfo( + TTestActorRuntime &runtime, + const char* fullPath, + ui64 schemeshardId = TTestTxConfig::SchemeShard) +{ + TPathInfo info; + auto description = DescribePrivatePath(runtime, schemeshardId, fullPath, true, true); + for (auto &part : description.GetPathDescription().GetTablePartitions()) + info.Shards.push_back(part.GetDatashardId()); + + auto [tables, ownerId] = GetTables(runtime, info.Shards.at(0)); + auto userTableName = TStringBuf(fullPath).RNextTok('/'); + info.UserTable = tables[userTableName]; + info.OwnerId = ownerId; + + return info; +} + +void CreateTableWithData( + TTestActorRuntime &runtime, + TTestEnv& env, + const char* path, + const char* name, + ui32 shardsCount, + ui64& txId, + ui64 schemeshardId = TTestTxConfig::SchemeShard) +{ + TestCreateTable(runtime, schemeshardId, ++txId, path, + Sprintf(R"____( + Name: "%s" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + UniformPartitionsCount: %d + )____", name, shardsCount)); + env.TestWaitNotification(runtime, txId, schemeshardId); + + auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) { + TString writeQuery = Sprintf(R"( + ( + (let key '( '('key (Uint64 '%lu)) ) ) + (let value '('('value (Utf8 'MostMeaninglessValueInTheWorld)) ) ) + (return (AsList (UpdateRow '__user__%s key value) )) + ) + )", key, tableName); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL(err, ""); + UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);; + }; + + for (ui64 key = 0; key < 100; ++key) { + fnWriteRow(TTestTxConfig::FakeHiveTablets, key, name); + } +} + void SetFeatures( TTestActorRuntime &runtime, TTestEnv&, @@ -61,6 +124,11 @@ void SetFeatures( compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig->MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + // 1 compaction / second + compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); + compactionConfig->MutableBackgroundCompactionConfig()->SetMaxRate(1); + compactionConfig->MutableBackgroundCompactionConfig()->SetRoundSeconds(0); + auto sender = runtime.AllocateEdgeActor(); runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); @@ -96,6 +164,11 @@ void DisableBackgroundCompactionViaRestart( compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + // 1 compaction / second + compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); + compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1); + compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0); + TActorId sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, schemeShard, sender); } @@ -116,11 +189,33 @@ void EnableBackgroundCompactionViaRestart( compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + // 1 compaction / second + compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); + compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1); + compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0); + TActorId sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, schemeShard, sender); } -ui64 GetCompactionsCount( +struct TCompactionStats { + ui64 BackgroundRequestCount = 0; + ui64 BackgroundCompactionCount = 0; + + TCompactionStats() = default; + + TCompactionStats(const NKikimrTxDataShard::TEvGetCompactTableStatsResult& stats) + : BackgroundRequestCount(stats.GetBackgroundCompactionRequests()) + , BackgroundCompactionCount(stats.GetBackgroundCompactionCount()) + {} + + void Update(const TCompactionStats& other) { + BackgroundRequestCount += other.BackgroundRequestCount; + BackgroundCompactionCount += other.BackgroundCompactionCount; + } +}; + +TCompactionStats GetCompactionStats( TTestActorRuntime &runtime, const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, ui64 tabletId, @@ -135,25 +230,41 @@ ui64 GetCompactionsCount( auto response = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetCompactTableStatsResult>(handle); UNIT_ASSERT(response->Record.HasBackgroundCompactionRequests()); - return response->Record.GetBackgroundCompactionRequests(); + return TCompactionStats(response->Record); } -ui64 GetCompactionsCount( +TCompactionStats GetCompactionStats( TTestActorRuntime &runtime, const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, const TVector<ui64>& shards, ui64 ownerId) { - ui64 compactionsCount = 0; + TCompactionStats stats; + for (auto shard: shards) { - compactionsCount += GetCompactionsCount( + stats.Update(GetCompactionStats( runtime, userTable, shard, - ownerId); + ownerId)); } - return compactionsCount; + return stats; +} + +TCompactionStats GetCompactionStats( + TTestActorRuntime &runtime, + const TString& path, + ui64 schemeshardId = TTestTxConfig::SchemeShard) +{ + auto info = GetPathInfo(runtime, path.c_str(), schemeshardId); + UNIT_ASSERT(!info.Shards.empty()); + + return GetCompactionStats( + runtime, + info.UserTable, + info.Shards, + info.OwnerId); } void CheckShardCompacted( @@ -163,11 +274,11 @@ void CheckShardCompacted( ui64 ownerId, bool shouldCompacted = true) { - auto count = GetCompactionsCount( + auto count = GetCompactionStats( runtime, userTable, tabletId, - ownerId); + ownerId).BackgroundRequestCount; if (shouldCompacted) { UNIT_ASSERT(count > 0); @@ -176,39 +287,30 @@ void CheckShardCompacted( } } -void CheckNoCompactions( +void CheckNoCompactionsInPeriod( TTestActorRuntime &runtime, TTestEnv& env, - ui64 schemeshardId, - const TString& path) + const TString& path, + ui64 schemeshardId = TTestTxConfig::SchemeShard) { - auto description = DescribePrivatePath(runtime, schemeshardId, path, true, true); - TVector<ui64> shards; - for (auto &part : description.GetPathDescription().GetTablePartitions()) - shards.push_back(part.GetDatashardId()); - - UNIT_ASSERT(!shards.empty()); + auto info = GetPathInfo(runtime, path.c_str(), schemeshardId); + UNIT_ASSERT(!info.Shards.empty()); env.SimulateSleep(runtime, TDuration::Seconds(30)); - auto [tables, ownerId] = GetTables(runtime, shards.at(0)); - - auto userTableName = TStringBuf(path).RNextTok('/'); - const auto& userTable = tables[userTableName]; - - auto count1 = GetCompactionsCount( + auto count1 = GetCompactionStats( runtime, - userTable, - shards, - ownerId); + info.UserTable, + info.Shards, + info.OwnerId).BackgroundRequestCount; env.SimulateSleep(runtime, TDuration::Seconds(30)); - auto count2 = GetCompactionsCount( + auto count2 = GetCompactionStats( runtime, - userTable, - shards, - ownerId); + info.UserTable, + info.Shards, + info.OwnerId).BackgroundRequestCount; UNIT_ASSERT_VALUES_EQUAL(count1, count2); } @@ -221,29 +323,14 @@ void TestBackgroundCompaction( { ui64 txId = 1000; - TestCreateTable(runtime, ++txId, "/MyRoot", - R"____( - Name: "Simple" - Columns { Name: "key1" Type: "Uint32"} - Columns { Name: "Value" Type: "Utf8"} - KeyColumnNames: ["key1"] - UniformPartitionsCount: 2 - )____"); - env.TestWaitNotification(runtime, txId); + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 2, txId); + auto info = GetPathInfo(runtime, "/MyRoot/Simple"); enableBackgroundCompactionFunc(runtime, env); - - auto description = DescribePrivatePath(runtime, "/MyRoot/Simple", true, true); - TVector<ui64> shards; - for (auto &part : description.GetPathDescription().GetTablePartitions()) - shards.push_back(part.GetDatashardId()); - env.SimulateSleep(runtime, TDuration::Seconds(30)); - auto [tables, ownerId] = GetTables(runtime, shards.at(0)); - - for (auto shard: shards) - CheckShardCompacted(runtime, tables["Simple"], shard, ownerId); + for (auto shard: info.Shards) + CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId); } ui64 TestServerless( @@ -329,17 +416,13 @@ ui64 TestServerless( // turn on background compaction EnableBackgroundCompactionViaRestart(runtime, env, schemeshardId, enableServerless); - auto description = DescribePrivatePath(runtime, schemeshardId, "/MyRoot/User/Simple", true, true); - TVector<ui64> shards; - for (auto &part : description.GetPathDescription().GetTablePartitions()) - shards.push_back(part.GetDatashardId()); + auto info = GetPathInfo(runtime, "/MyRoot/User/Simple", schemeshardId); + UNIT_ASSERT(!info.Shards.empty()); env.SimulateSleep(runtime, TDuration::Seconds(30)); - auto [tables, ownerId] = GetTables(runtime, shards.at(0)); - - for (auto shard: shards) - CheckShardCompacted(runtime, tables["Simple"], shard, ownerId, enableServerless); + for (auto shard: info.Shards) + CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId, enableServerless); return schemeshardId; } @@ -402,7 +485,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { // some time to finish compactions in progress env.SimulateSleep(runtime, TDuration::Seconds(30)); - CheckNoCompactions(runtime, env, TTestTxConfig::SchemeShard, "/MyRoot/Simple"); + CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); } Y_UNIT_TEST(ShouldNotCompactServerless) { @@ -444,7 +527,88 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { // some time to finish compactions in progress env.SimulateSleep(runtime, TDuration::Seconds(30)); - CheckNoCompactions(runtime, env, schemeshardId, "/MyRoot/User/Simple"); + CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId); + } + + Y_UNIT_TEST(SchemeshardShouldNotCompactBackups) { + // enabled via schemeshard restart + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + // disable for the case, when compaction is enabled by default + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false); + + ui64 txId = 1000; + + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 2, txId); + + // backup table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Simple" + IsBackup: true + )"); + env.TestWaitNotification(runtime, txId); + + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + + CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); + UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL); + } + + Y_UNIT_TEST(SchemeshardShouldNotCompactBorrowed) { + // enabled via schemeshard restart + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + //runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_TRACE); + + // disable for the case, when compaction is enabled by default + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false); + + // capture original observer func by setting dummy one + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCompactBorrowed: + // we should not compact borrowed to check that background compaction + // will not compact shard with borrowed parts as well + Y_UNUSED(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + default: + return originalObserver(runtime, ev); + } + }); + + ui64 txId = 1000; + + // note that we create 1-sharded table to avoid complications + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + // copy table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Simple" + )"); + env.TestWaitNotification(runtime, txId); + + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + + CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); + UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL); + + // original table should not be compacted as well + CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); } }; |
