diff options
author | Evgeniy Ivanov <eivanov89@yandex-team.ru> | 2022-06-06 16:47:10 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 16:47:10 +0300 |
commit | 31846aab71130c74baf2a34115726954b1ff26df (patch) | |
tree | 07fc8b91c5ed880df145e541f52b8ec2f0870bf9 | |
parent | fa8ff7b07c7ef2cac601269486b51a1b720dcd02 (diff) | |
download | ydb-31846aab71130c74baf2a34115726954b1ff26df.tar.gz |
PR from branch users/eivanov89/compaction_fixes_22-2
KIKIMR-14017: implement queue to compact borrowed data
REVIEW: 2550334
KIKIMR-14761: fix bug to always wakeup compaction queue
REVIEW: 2560061
REVIEW: 2600932
x-ydb-stable-ref: 29443775e0b4b67d4a92285ff83902639665a1ae
33 files changed, 1110 insertions, 198 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8bc714d0d1..167348a284 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1440,7 +1440,16 @@ message TCompactionConfig { optional bool CompactSinglePartedShards = 10 [default = false]; } + message TBorrowedCompactionConfig { + optional double MaxRate = 1 [default = 0]; // unlimitted + optional uint64 InflightLimit = 2 [default = 10]; // TODO: consider more? + + // After this interval we will try to restart + optional uint64 TimeoutSeconds = 3 [default = 15]; + } + optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1; + optional TBorrowedCompactionConfig BorrowedCompactionConfig = 2; } // This message is used to upload custom service configs diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index bd0534b651..d68c7d1387 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -110,6 +110,7 @@ enum ECumulativeCounters { COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84 [(CounterOpts) = {Name: "TxCompactTableFailedStart"}]; COUNTER_FULL_COMPACTION_DONE = 85 [(CounterOpts) = {Name: "FullCompactionCount"}]; COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED = 86 [(CounterOpts) = {Name: "TxCompactTableFailedLoaned"}]; + COUNTER_TX_COMPACT_BORROWED = 87 [(CounterOpts) = {Name: "TxCompactBorrowed"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 4a2188d4fb..570566e293 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -155,6 +155,9 @@ enum ESimpleCounters { COUNTER_COMPACTION_QUEUE_WAITING_REPEAT = 126 [(CounterOpts) = {Name: "BackgroundCompactionQueueWaitingRepeat"}]; COUNTER_SHARDS_WITH_BORROWED_DATA = 127 [(CounterOpts) = {Name: "TableShardsWithBorrowedData"}]; COUNTER_SHARDS_WITH_LOANED_DATA = 128 [(CounterOpts) = {Name: "TableShardsWithLoanedData"}]; + + COUNTER_BORROWED_COMPACTION_QUEUE_SIZE = 129 [(CounterOpts) = {Name: "BorrowedCompactionQueueSize"}]; + COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING = 130 [(CounterOpts) = {Name: "BorrowedCompactionQueueRunning"}]; } enum ECumulativeCounters { @@ -246,6 +249,9 @@ enum ECumulativeCounters { COUNTER_BACKGROUND_COMPACTION_BORROWED = 73 [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}]; COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74 [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}]; COUNTER_BACKGROUND_COMPACTION_LOANED = 75 [(CounterOpts) = {Name: "BackgroundCompactionLoaned"}]; + + COUNTER_BORROWED_COMPACTION_OK = 76 [(CounterOpts) = {Name: "BorrowedCompactionOK"}]; + COUNTER_BORROWED_COMPACTION_TIMEOUT = 77 [(CounterOpts) = {Name: "BorrowedCompactionTimeout"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 001c68d222..32c041750e 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1430,6 +1430,11 @@ message TEvCompactBorrowed { optional NKikimrProto.TPathID PathId = 1; } +message TEvCompactBorrowedResult { + optional uint64 TabletId = 1; + optional NKikimrProto.TPathID PathId = 2; +} + message TEvGetCompactTableStats { optional NKikimrProto.TPathID PathId = 1; } @@ -1437,6 +1442,7 @@ message TEvGetCompactTableStats { message TEvGetCompactTableStatsResult { optional uint64 BackgroundCompactionRequests = 1; optional uint64 BackgroundCompactionCount = 2; + optional uint64 CompactBorrowedCount = 3; } // TEvRead is used to request multiple queries from a shard and diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index bf02891c77..8f135775cd 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -386,6 +386,11 @@ TAutoPtr<TSubset> TDatabase::ScanSnapshot(ui32 table, TRowVersion snapshot) return Require(table)->ScanSnapshot(snapshot); } +bool TDatabase::HasBorrowed(ui32 table, ui64 selfTabletId) const +{ + return Require(table)->HasBorrowed(selfTabletId); +} + TBundleSlicesMap TDatabase::LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const { return Require(table)->LookupSlices(bundles); diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index eda7bd4568..91bd2865f3 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -144,6 +144,8 @@ public: TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const; TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max()); + bool HasBorrowed(ui32 table, ui64 selfTabletId) const; + TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const; void ReplaceSlices(ui32 table, TBundleSlicesMap slices); diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 10242892e4..76ef090dd1 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -3812,6 +3812,11 @@ bool TExecutor::HasLoanedParts() const { return false; } +bool TExecutor::HasBorrowed(ui32 table, ui64 selfTabletId) const { + Y_VERIFY_S(Database, "Checking borrowers of table# " << table << " for tablet# " << selfTabletId); + return Database->HasBorrowed(table, selfTabletId); +} + const TExecutorStats& TExecutor::GetStats() const { return *Stats; } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index b4d519b6e0..b9c6753bf0 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -649,6 +649,8 @@ public: THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const override; bool HasLoanedParts() const override; + bool HasBorrowed(ui32 table, ui64 selfTabletId) const override; + const TExecutorStats& GetStats() const override; NMetrics::TResourceMetrics* GetResourceMetrics() const override; diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index 11b3b90199..f746954346 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -122,6 +122,23 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept return subset; } +bool TTable::HasBorrowed(ui64 selfTabletId) const noexcept +{ + for (const auto &it : TxStatus) + if (it.second->Label.TabletID() != selfTabletId) + return true; + + for (auto &it: Flatten) + if (it.second->Label.TabletID() != selfTabletId) + return true; + + for (auto &it: ColdParts) + if (it.second->Label.TabletID() != selfTabletId) + return true; + + return false; +} + TAutoPtr<TSubset> TTable::ScanSnapshot(TRowVersion snapshot) noexcept { TAutoPtr<TSubset> subset = new TSubset(Epoch, Scheme); @@ -915,16 +932,6 @@ TCompactionStats TTable::GetCompactionStats() const stats.MemRowCount = GetMemRowCount(); stats.MemDataSize = GetMemSize(); stats.MemDataWaste = GetMemWaste(); - - for (auto &it: ColdParts) - stats.PartOwners.insert(it.second->Label.TabletID()); - - for (auto &it: Flatten) - stats.PartOwners.insert(it.second->Label.TabletID()); - - for (auto &it: TxStatus) - stats.PartOwners.insert(it.second->Label.TabletID()); - stats.PartCount = Flatten.size() + ColdParts.size(); return stats; diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index a1fdeb9709..91dbf6f3fc 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -80,6 +80,8 @@ public: TAutoPtr<TSubset> ScanSnapshot(TRowVersion snapshot = TRowVersion::Max()) noexcept; TAutoPtr<TSubset> Unwrap() noexcept; /* full Subset(..) + final Replace(..) */ + bool HasBorrowed(ui64 selfTabletId) const noexcept; + /** * Returns current slices for bundles * diff --git a/ydb/core/tablet_flat/flat_table_stats.h b/ydb/core/tablet_flat/flat_table_stats.h index 5b600dcf13..41b26a85fb 100644 --- a/ydb/core/tablet_flat/flat_table_stats.h +++ b/ydb/core/tablet_flat/flat_table_stats.h @@ -39,7 +39,6 @@ namespace NTable { }; struct TCompactionStats { - THashSet<ui64> PartOwners; ui64 PartCount = 0; ui64 MemRowCount = 0; ui64 MemDataSize = 0; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 677106bc6b..f5949deb66 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -548,6 +548,8 @@ namespace NFlatExecutorSetup { virtual THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const = 0; virtual bool HasLoanedParts() const = 0; + virtual bool HasBorrowed(ui32 table, ui64 selfTabletId) const = 0; + // This method lets executor know about new yellow channels virtual void OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) = 0; diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 898e4a502b..1de4c368ce 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -299,6 +299,8 @@ struct TEvDataShard { EvReadAck, EvReadCancel, + EvCompactBorrowedResult, + EvEnd }; @@ -946,7 +948,7 @@ struct TEvDataShard { // Arrow std::shared_ptr<arrow::RecordBatch> ArrowBatch; - + private: // for local events TVector<TOwnedCellVec> Rows; @@ -1396,23 +1398,47 @@ struct TEvDataShard { /** * This message is used to ask datashard to compact any borrowed parts it has - * for the specified user table. No reply is expected for this message, - * instead schemeshard expects to receive updated stats if and when - * such a compaction has finished. + * for the specified user table. */ - struct TEvCompactBorrowed : public TEventPB<TEvCompactBorrowed, NKikimrTxDataShard::TEvCompactBorrowed, EvCompactBorrowed> { + struct TEvCompactBorrowed : public TEventPB<TEvCompactBorrowed, + NKikimrTxDataShard::TEvCompactBorrowed, + EvCompactBorrowed> { TEvCompactBorrowed() = default; - TEvCompactBorrowed(const TPathId& pathId) { - Record.MutablePathId()->SetOwnerId(pathId.OwnerId); - Record.MutablePathId()->SetLocalId(pathId.LocalPathId); + TEvCompactBorrowed(ui64 ownerId, ui64 localId) { + Record.MutablePathId()->SetOwnerId(ownerId); + Record.MutablePathId()->SetLocalId(localId); } + TEvCompactBorrowed(const TPathId& pathId) + : TEvCompactBorrowed(pathId.OwnerId, pathId.LocalPathId) + { } + // Sanity check for safe merging to earlier versions static_assert(EvCompactBorrowed == EventSpaceBegin(TKikimrEvents::ES_TX_DATASHARD) + 7 * 512 + 60, "EvCompactBorrowed event has an unexpected value"); }; + struct TEvCompactBorrowedResult : public TEventPB<TEvCompactBorrowedResult, + NKikimrTxDataShard::TEvCompactBorrowedResult, + EvCompactBorrowedResult> { + TEvCompactBorrowedResult() = default; + + TEvCompactBorrowedResult(ui64 tabletId, ui64 ownerId, ui64 localId) { + Record.SetTabletId(tabletId); + Record.MutablePathId()->SetOwnerId(ownerId); + Record.MutablePathId()->SetLocalId(localId); + } + + TEvCompactBorrowedResult(ui64 tabletId, const TPathId& pathId) + : TEvCompactBorrowedResult(tabletId, pathId.OwnerId, pathId.LocalPathId) + {} + + // Sanity check for safe merging to earlier versions + static_assert(EvCompactBorrowedResult == EventSpaceBegin(TKikimrEvents::ES_TX_DATASHARD) + 7 * 512 + 68, + "EvCompactBorrowedResult event has an unexpected value"); + }; + struct TEvGetCompactTableStats : public TEventPB<TEvGetCompactTableStats, NKikimrTxDataShard::TEvGetCompactTableStats, TEvDataShard::EvGetCompactTableStats> { TEvGetCompactTableStats() = default; diff --git a/ydb/core/tx/datashard/datashard__compact_borrowed.cpp b/ydb/core/tx/datashard/datashard__compact_borrowed.cpp index 8d36ee99a3..736238a44f 100644 --- a/ydb/core/tx/datashard/datashard__compact_borrowed.cpp +++ b/ydb/core/tx/datashard/datashard__compact_borrowed.cpp @@ -21,38 +21,31 @@ public: << " for table " << pathId << " at tablet " << Self->TabletID()); + auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId); + if (pathId.OwnerId != Self->GetPathOwnerId()) { // Ignore unexpected owner + ctx.Send(Ev->Sender, std::move(response)); return true; } auto it = Self->TableInfos.find(pathId.LocalPathId); if (it == Self->TableInfos.end()) { // Ignore unexpected table (may normally happen with races) + ctx.Send(Ev->Sender, std::move(response)); return true; } const TUserTable& tableInfo = *it->second; - auto subset = txc.DB.Subset(tableInfo.LocalTid, NTable::TEpoch::Max(), {}, {}); - - THashSet<ui64> partOwners; - NTable::GetPartOwners(*subset, partOwners); - - bool hasBorrowed = false; - for (ui64 tabletId : partOwners) { - if (tabletId != Self->TabletID()) { - hasBorrowed = true; - break; - } - } - + bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID()); if (!hasBorrowed) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "TEvCompactBorrowed request from " << Ev->Sender << " for table " << pathId << " has no borrowed parts" << " at tablet " << Self->TabletID()); + ctx.Send(Ev->Sender, std::move(response)); return true; } @@ -61,7 +54,13 @@ public: << " for table " << pathId << " starting compaction for local table " << tableInfo.LocalTid << " at tablet " << Self->TabletID()); + Self->Executor()->CompactBorrowed(tableInfo.LocalTid); + Self->IncCounter(COUNTER_TX_COMPACT_BORROWED); + ++tableInfo.Stats.CompactBorrowedCount; + + Self->CompactBorrowedWaiters[tableInfo.LocalTid].emplace_back(Ev->Sender); + return true; } diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp index 317767d736..70001db0b3 100644 --- a/ydb/core/tx/datashard/datashard__compaction.cpp +++ b/ydb/core/tx/datashard/datashard__compaction.cpp @@ -68,17 +68,7 @@ public: ++tableInfo.Stats.BackgroundCompactionRequests; - auto stats = txc.DB.GetCompactionStats(localTid); - - bool hasBorrowed = false; - if (stats.PartOwners.size() > 1) { - hasBorrowed = true; - } else if (stats.PartOwners.size() == 1) { - if (*stats.PartOwners.begin() != Self->TabletID()) { - hasBorrowed = true; - } - } - + bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID()); if (hasBorrowed && !record.GetCompactBorrowed()) { // normally we should not receive requests to compact in this case // but in some rare cases like schemeshard restart we can @@ -117,6 +107,7 @@ public: return true; } + auto stats = txc.DB.GetCompactionStats(localTid); bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0; bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0; if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) { @@ -151,7 +142,7 @@ public: << ", memtableRows# " << stats.MemRowCount); Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION); - Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, pathId, Ev->Sender)); + Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, Ev->Sender)); ++tableInfo.Stats.BackgroundCompactionCount; } else { // compaction failed, for now we don't care @@ -207,6 +198,7 @@ void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorCon void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) { auto finishedInfo = Executor()->GetFinishedCompactionInfo(tableId); + TLocalPathId localPathId = InvalidLocalPathId; if (tableId >= Schema::MinLocalTid) { for (auto& ti : TableInfos) { if (ti.second->LocalTid != tableId && ti.second->ShadowTid != tableId) @@ -223,40 +215,68 @@ void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) { } ti.second->StatsNeedUpdate = true; + localPathId = ti.first; UpdateTableStats(ctx); break; } } - ReplyCompactionWaiters(tableId, finishedInfo.Edge, ctx); + ReplyCompactionWaiters(tableId, localPathId, finishedInfo, ctx); } -void TDataShard::ReplyCompactionWaiters(ui32 tableId, ui64 edge, const TActorContext &ctx) { +void TDataShard::ReplyCompactionWaiters( + ui32 tableId, + TLocalPathId localPathId, + const NTabletFlatExecutor::TFinishedCompactionInfo& compactionInfo, + const TActorContext &ctx) +{ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "CompactionComplete of tablet# "<< TabletID() << ", table# " << tableId - << ", finished edge# " << edge + << ", finished edge# " << compactionInfo.Edge << ", front# " << (CompactionWaiters[tableId].empty() ? 0UL : std::get<0>(CompactionWaiters[tableId].front()))); - while (!CompactionWaiters[tableId].empty()) { + auto& fullCompactionQueue = CompactionWaiters[tableId]; + while (!fullCompactionQueue.empty()) { const auto& waiter = CompactionWaiters[tableId].front(); - if (std::get<0>(waiter) > edge) + if (std::get<0>(waiter) > compactionInfo.Edge) break; - const auto& pathId = std::get<1>(waiter); - const auto& sender = std::get<2>(waiter); + const auto& sender = std::get<1>(waiter); auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>( TabletID(), - pathId.OwnerId, - pathId.LocalPathId, + GetPathOwnerId(), + localPathId, NKikimrTxDataShard::TEvCompactTableResult::OK); ctx.Send(sender, std::move(response)); LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Sending TEvCompactTableResult to# " << sender - << "pathId# " << pathId); + << "pathId# " << TPathId(GetPathOwnerId(), localPathId)); - CompactionWaiters[tableId].pop_front(); + fullCompactionQueue.pop_front(); + } + + auto& compactBorrowedQueue = CompactBorrowedWaiters[tableId]; + if (!compactBorrowedQueue.empty()) { + const bool hasBorrowed = Executor()->HasBorrowed(tableId, TabletID()); + if (!hasBorrowed) { + while (!compactBorrowedQueue.empty()) { + const auto& waiter = compactBorrowedQueue.front(); + + auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>( + TabletID(), + GetPathOwnerId(), + localPathId); + ctx.Send(waiter, std::move(response)); + + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Sending TEvCompactBorrowedResult to# " << waiter + << "pathId# " << TPathId(GetPathOwnerId(), localPathId)); + + compactBorrowedQueue.pop_front(); + } + } } } @@ -274,6 +294,7 @@ void TDataShard::Handle(TEvDataShard::TEvGetCompactTableStats::TPtr& ev, const T const TUserTable& tableInfo = *it->second; response->Record.SetBackgroundCompactionRequests(tableInfo.Stats.BackgroundCompactionRequests); response->Record.SetBackgroundCompactionCount(tableInfo.Stats.BackgroundCompactionCount); + response->Record.SetCompactBorrowedCount(tableInfo.Stats.CompactBorrowedCount); } ctx.Send(ev->Sender, std::move(response)); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 8c7272e32b..e9ddb66d70 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1309,7 +1309,11 @@ public: void CompactionComplete(ui32 tableId, const TActorContext &ctx) override; void CompletedLoansChanged(const TActorContext &ctx) override; - void ReplyCompactionWaiters(ui32 tableId, ui64 edge, const TActorContext &ctx); + void ReplyCompactionWaiters( + ui32 tableId, + TLocalPathId localPathId, + const NTabletFlatExecutor::TFinishedCompactionInfo& compactionInfo, + const TActorContext &ctx); TUserTable::TSpecialUpdate SpecialUpdates(const NTable::TDatabase& db, const TTableId& tableId) const; @@ -2198,8 +2202,8 @@ private: // in THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id - // compactionId, tableId/ownerId, actorId - using TCompactionWaiter = std::tuple<ui64, TPathId, TActorId>; + // compactionId, actorId + using TCompactionWaiter = std::tuple<ui64, TActorId>; using TCompactionWaiterList = TList<TCompactionWaiter>; // tableLocalTid -> waiters, note that compactionId is monotonically @@ -2208,6 +2212,11 @@ private: // from the front THashMap<ui32, TCompactionWaiterList> CompactionWaiters; + using TCompactBorrowedWaiterList = TList<TActorId>; + + // tableLocalTid -> waiters, similar to CompactionWaiters + THashMap<ui32, TCompactBorrowedWaiterList> CompactBorrowedWaiters; + struct TReplicationSourceOffsetsReceiveState { // A set of tables for which we already received offsets THashSet<TPathId> Received; diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 6bbd8dec1a..f3b6f8112c 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -320,6 +320,7 @@ struct TUserTable : public TThrRefBase { ui64 RowCountResolution = 0; ui64 BackgroundCompactionRequests = 0; ui64 BackgroundCompactionCount = 0; + ui64 CompactBorrowedCount = 0; NTable::TKeyAccessSample AccessStats; void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) { diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index f4fa7ace57..14d5530153 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -36,7 +36,7 @@ class TOperationQueueWithTimer private: NKikimrServices::EServiceKikimr ServiceId = NKikimrServices::EServiceKikimr(LogServiceId); TActorId LongTimerId; - TInstant When; + TMonotonic When; public: TOperationQueueWithTimer(const typename TBase::TConfig& config, @@ -60,31 +60,32 @@ public: TActorBase::PassAway(); } - TInstant GetWakeupTime() const { return When; } + TDuration GetWakeupDelta() const { return When - const_cast<TThis*>(this)->Now(); } private: // ITimer, note that it is made private, // since it should be called only from TBase - void SetWakeupTimer(TInstant t) override { - if (When > t) + void SetWakeupTimer(TDuration delta) override { + if (LongTimerId) this->Send(LongTimerId, new TEvents::TEvPoison); - When = t; - auto delta = t - this->Now(); + When = this->Now() + delta; auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId()); LongTimerId = CreateLongTimer(ctx, delta, new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue)); LOG_DEBUG_S(ctx, ServiceId, - "Operation queue set NextWakeup# " << When << ", delta# " << delta.Seconds() << " seconds"); + "Operation queue set wakeup after delta# " << delta.Seconds() << " seconds"); } - TInstant Now() override { - return AppData()->TimeProvider->Now(); + TMonotonic Now() override { + return AppData()->MonotonicTimeProvider->Now(); } void HandleWakeup(const TActorContext &ctx) { - LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup# " << this->Now()); + LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup"); + When = {}; + LongTimerId = {}; TBase::Wakeup(); } diff --git a/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp new file mode 100644 index 0000000000..1da2fd32e6 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp @@ -0,0 +1,161 @@ +#include "schemeshard_impl.h" + +namespace NKikimr::NSchemeShard { + +NOperationQueue::EStartStatus TSchemeShard::StartBorrowedCompaction(const TShardIdx& shardIdx) { + UpdateBorrowedCompactionQueueMetrics(); + + auto ctx = TActivationContext::ActorContextFor(SelfId()); + + auto it = ShardInfos.find(shardIdx); + if (it == ShardInfos.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " + "for borrowed compaction# " << shardIdx + << " at schemeshard# " << TabletID()); + + return NOperationQueue::EStartStatus::EOperationRemove; + } + + const auto& datashardId = it->second.TabletID; + const auto& pathId = it->second.PathId; + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBorrowedCompaction " + "for pathId# " << pathId << ", datashard# " << datashardId + << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() + << ", rate# " << BorrowedCompactionQueue->GetRate() + << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" + << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" + << " at schemeshard " << TabletID()); + + std::unique_ptr<TEvDataShard::TEvCompactBorrowed> request( + new TEvDataShard::TEvCompactBorrowed(pathId.OwnerId, pathId.LocalPathId)); + + RunningBorrowedCompactions[shardIdx] = PipeClientCache->Send( + ctx, + ui64(datashardId), + request.release()); + + return NOperationQueue::EStartStatus::EOperationRunning; +} + +void TSchemeShard::OnBorrowedCompactionTimeout(const TShardIdx& shardIdx) { + UpdateBorrowedCompactionQueueMetrics(); + TabletCounters->Cumulative()[COUNTER_BORROWED_COMPACTION_TIMEOUT].Increment(1); + + RunningBorrowedCompactions.erase(shardIdx); + + auto ctx = TActivationContext::ActorContextFor(SelfId()); + + auto it = ShardInfos.find(shardIdx); + if (it == ShardInfos.end()) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " + "for timeout borrowed compaction# " << shardIdx + << " at schemeshard# " << TabletID()); + return; + } + + const auto& datashardId = it->second.TabletID; + const auto& pathId = it->second.PathId; + + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Borrowed compaction timeout " + "for pathId# " << pathId << ", datashard# " << datashardId + << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() + << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" + << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" + << " at schemeshard " << TabletID()); + + // retry + EnqueueBorrowedCompaction(shardIdx); +} + +void TSchemeShard::BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId) { + auto tabletIt = TabletIdToShardIdx.find(tabletId); + if (tabletIt == TabletIdToShardIdx.end()) + return; // just sanity check + const auto& shardIdx = tabletIt->second; + + auto it = RunningBorrowedCompactions.find(shardIdx); + if (it == RunningBorrowedCompactions.end()) + return; + + if (it->second != clientId) + return; + + RunningBorrowedCompactions.erase(it); + + // disonnected from node we have requested borrowed compaction. We just resend request, because it + // is safe: if first request is executing or has been already executed, then second request will be ignored. + + StartBorrowedCompaction(shardIdx); +} + +void TSchemeShard::EnqueueBorrowedCompaction(const TShardIdx& shardIdx) { + if (!BorrowedCompactionQueue) + return; + + auto ctx = TActivationContext::ActorContextFor(SelfId()); + + LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "borrowed compaction enqueue shard# " << shardIdx << " at schemeshard " << TabletID()); + + BorrowedCompactionQueue->Enqueue(shardIdx); + UpdateBorrowedCompactionQueueMetrics(); +} + +void TSchemeShard::RemoveBorrowedCompaction(const TShardIdx& shardIdx) { + if (!BorrowedCompactionQueue) + return; + + RunningBorrowedCompactions.erase(shardIdx); + BorrowedCompactionQueue->Remove(shardIdx); + UpdateBorrowedCompactionQueueMetrics(); +} + +void TSchemeShard::UpdateBorrowedCompactionQueueMetrics() { + if (!BorrowedCompactionQueue) + return; + + TabletCounters->Simple()[COUNTER_BORROWED_COMPACTION_QUEUE_SIZE].Set(BorrowedCompactionQueue->Size()); + TabletCounters->Simple()[COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING].Set(BorrowedCompactionQueue->RunningSize()); +} + +void TSchemeShard::Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx) { + const auto& record = ev->Get()->Record; + + const TTabletId tabletId(record.GetTabletId()); + const TShardIdx shardIdx = GetShardIdx(tabletId); + + auto pathId = TPathId( + record.GetPathId().GetOwnerId(), + record.GetPathId().GetLocalId()); + + auto duration = BorrowedCompactionQueue->OnDone(shardIdx); + + if (shardIdx == InvalidShardIdx) { + LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished borrowed compaction of unknown shard " + "for pathId# " << pathId << ", datashard# " << tabletId + << " in# " << duration.MilliSeconds() + << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() + << ", rate# " << BorrowedCompactionQueue->GetRate() + << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" + << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" + << " at schemeshard " << TabletID()); + } else { + LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished borrowed compaction " + "for pathId# " << pathId << ", datashard# " << tabletId + << ", shardIdx# " << shardIdx + << " in# " << duration.MilliSeconds() + << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() + << ", rate# " << BorrowedCompactionQueue->GetRate() + << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" + << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" + << " at schemeshard " << TabletID()); + } + + RunningBorrowedCompactions.erase(shardIdx); + + TabletCounters->Cumulative()[COUNTER_BORROWED_COMPACTION_OK].Increment(1); + UpdateBorrowedCompactionQueueMetrics(); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index d16e5b0ae8..810be23a9d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -1,7 +1,6 @@ #include "schemeshard_impl.h" -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard { NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardCompactionInfo& info) { UpdateBackgroundCompactionQueueMetrics(); @@ -24,7 +23,7 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction " "for pathId# " << pathId << ", datashard# " << datashardId << ", compactionInfo# " << info - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -65,7 +64,7 @@ void TSchemeShard::OnBackgroundCompactionTimeout(const TShardCompactionInfo& inf LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout " "for pathId# " << pathId << ", datashard# " << datashardId << ", compactionInfo# " << info - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -94,7 +93,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard " "for pathId# " << pathId << ", datashard# " << tabletId << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -105,7 +104,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T "for pathId# " << pathId << ", datashard# " << tabletId << ", shardIdx# " << shardIdx << " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus() - << ", next wakeup# " << CompactionQueue->GetWakeupTime() + << ", next wakeup in# " << CompactionQueue->GetWakeupDelta() << ", rate# " << CompactionQueue->GetRate() << ", in queue# " << CompactionQueue->Size() << " shards" << ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards" @@ -138,7 +137,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T UpdateBackgroundCompactionQueueMetrics(); } -void TSchemeShard::EnqueueCompaction( +void TSchemeShard::EnqueueBackgroundCompaction( const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats) { @@ -173,7 +172,7 @@ void TSchemeShard::EnqueueCompaction( UpdateBackgroundCompactionQueueMetrics(); } -void TSchemeShard::UpdateCompaction( +void TSchemeShard::UpdateBackgroundCompaction( const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats) { @@ -186,7 +185,7 @@ void TSchemeShard::UpdateCompaction( LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "background compaction update removed shard# " << shardIdx << " with borrowed parts at schemeshard " << TabletID()); - RemoveCompaction(shardIdx); + RemoveBackgroundCompaction(shardIdx); return; } @@ -194,7 +193,7 @@ void TSchemeShard::UpdateCompaction( LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "background compaction update removed shard# " << shardIdx << " with loaned parts at schemeshard " << TabletID()); - RemoveCompaction(shardIdx); + RemoveBackgroundCompaction(shardIdx); return; } @@ -212,7 +211,7 @@ void TSchemeShard::UpdateCompaction( UpdateBackgroundCompactionQueueMetrics(); } -void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) { +void TSchemeShard::RemoveBackgroundCompaction(const TShardIdx& shardIdx) { if (!CompactionQueue) return; @@ -221,11 +220,15 @@ void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) { } void TSchemeShard::ShardRemoved(const TShardIdx& shardIdx) { - RemoveCompaction(shardIdx); + RemoveBackgroundCompaction(shardIdx); + RemoveBorrowedCompaction(shardIdx); RemoveShardMetrics(shardIdx); } void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() { + if (!CompactionQueue) + return; + TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE].Set(CompactionQueue->Size()); TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_RUNNING].Set(CompactionQueue->RunningSize()); TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_WAITING_REPEAT].Set(CompactionQueue->WaitingSize()); @@ -300,5 +303,4 @@ void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) { PartitionMetricsMap.erase(it); } -} // NSchemeShard -} // NKikimr +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 1945d3cfed..1da7c610a4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -380,7 +380,7 @@ void NTableState::UpdatePartitioningForCopyTable(TOperationId operationId, TTxSt NIceDb::TNiceDb db(context.GetDB()); // Erase previous partitioning as we are going to generate new one - context.SS->DeleteTablePartitioning(db, txState.TargetPathId, dstTableInfo); + context.SS->PersistTablePartitioningDeletion(db, txState.TargetPathId, dstTableInfo); // Remove old shardIdx info and old txShards for (const auto& shard : txState.Shards) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp index 9a4bfc7daf..055525640f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp @@ -290,10 +290,11 @@ public: auto oldAggrStats = tableInfo->GetStats().Aggregated; // Delete the whole old partitioning and persist the whole new partitionig as the indexes have changed - context.SS->DeleteTablePartitioning(db, tableId, tableInfo); + context.SS->PersistTablePartitioningDeletion(db, tableId, tableInfo); context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning)); context.SS->PersistTablePartitioning(db, tableId, tableInfo); context.SS->PersistTablePartitionStats(db, tableId, tableInfo); + context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_ACTIVE_COUNT].Sub(allSrcShardIdxs.size()); context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_INACTIVE_COUNT].Add(allSrcShardIdxs.size()); diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index a464fc39f9..065836b85d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -194,10 +194,14 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte table->UpdateShardStats(shardIdx, newStats); if (!table->IsBackup) { - Self->UpdateCompaction(shardIdx, newStats); + Self->UpdateBackgroundCompaction(shardIdx, newStats); Self->UpdateShardMetrics(shardIdx, newStats); } + if (!newStats.HasBorrowedData) { + Self->RemoveBorrowedCompaction(shardIdx); + } + NIceDb::TNiceDb db(txc.DB); if (!table->IsBackup && !table->IsShardsStatsDetached()) { @@ -333,7 +337,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte if (newStats.HasBorrowedData) { // We don't want to split shards that have borrow parts // We must ask them to compact first - CompactEv.Reset(new TEvDataShard::TEvCompactBorrowed(tableId)); + Self->EnqueueBorrowedCompaction(shardIdx); return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 62a50500e0..83c4b64bfc 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -90,7 +90,7 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx, ScheduleCleanDroppedPaths(); ScheduleCleanDroppedSubDomains(); - StartStopCompactionQueue(); + StartStopCompactionQueues(); ctx.Send(TxAllocatorClient, MakeHolder<TEvTxAllocatorClient::TEvAllocate>(InitiateCachedTxIdsCount)); @@ -285,10 +285,17 @@ TMessageSeqNo TSchemeShard::NextRound() { void TSchemeShard::Clear() { PathsById.clear(); Tables.clear(); + if (CompactionQueue) { CompactionQueue->Clear(); UpdateBackgroundCompactionQueueMetrics(); } + + if (BorrowedCompactionQueue) { + BorrowedCompactionQueue->Clear(); + UpdateBorrowedCompactionQueueMetrics(); + } + ShardsWithBorrowed.clear(); ShardsWithLoaned.clear(); PersQueueGroups.clear(); @@ -2078,7 +2085,7 @@ void TSchemeShard::PersistTablePartitioning(NIceDb::TNiceDb& db, const TPathId p } } -void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) { +void TSchemeShard::PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) { const auto& partitions = tableInfo->GetPartitions(); for (ui64 pi = 0; pi < partitions.size(); ++pi) { if (IsLocalId(pathId)) { @@ -2086,8 +2093,6 @@ void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pa } db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete(); db.Table<Schema::TablePartitionStats>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete(); - - ShardRemoved(partitions[pi].ShardIdx); } } @@ -3679,6 +3684,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) new TPipeClientFactory(this))) , PipeTracker(*PipeClientCache) , CompactionStarter(this) + , BorrowedCompactionStarter(this) , ShardDeleter(info->TabletID) , AllowDataColumnForIndexTable(0, 0, 1) , EnableAsyncIndexes(0, 0, 1) @@ -3754,6 +3760,9 @@ void TSchemeShard::Die(const TActorContext &ctx) { if (CompactionQueue) CompactionQueue->Shutdown(ctx); + if (BorrowedCompactionQueue) + BorrowedCompactionQueue->Shutdown(ctx); + return IActor::Die(ctx); } @@ -3789,10 +3798,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { EnableBackgroundCompaction = appData->FeatureFlags.GetEnableBackgroundCompaction(); EnableBackgroundCompactionServerless = appData->FeatureFlags.GetEnableBackgroundCompactionServerless(); - if (!CompactionQueue) - ConfigureCompactionQueue(appData->CompactionConfig.GetBackgroundCompactionConfig(), ctx); - ctx.RegisterWithSameMailbox(CompactionQueue); + ConfigureCompactionQueues(appData->CompactionConfig, ctx); if (appData->ChannelProfiles) { ChannelProfiles = appData->ChannelProfiles; @@ -3973,6 +3980,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvSchemeShard::TEvMigrateSchemeShardResult, Handle); HFuncTraced(TEvDataShard::TEvMigrateSchemeShardResponse, Handle); HFuncTraced(TEvDataShard::TEvCompactTableResult, Handle); + HFuncTraced(TEvDataShard::TEvCompactBorrowedResult, Handle); HFuncTraced(TEvSchemeShard::TEvSyncTenantSchemeShard, Handle); HFuncTraced(TEvSchemeShard::TEvUpdateTenantSchemeShard, Handle); @@ -4613,6 +4621,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc << ", to tablet: " << tabletId << ", at schemeshard: " << TabletID()); + BorrowedCompactionHandleDisconnect(tabletId, clientId); RestartPipeTx(tabletId, ctx); } @@ -4650,6 +4659,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc return; } + BorrowedCompactionHandleDisconnect(tabletId, clientId); RestartPipeTx(tabletId, ctx); } @@ -5826,14 +5836,32 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T } if (!tableInfo->IsBackup) { + // partitions updated: + // 1. We need to remove some parts from compaction queues. + // 2. We need to add new ones to the queues. Since we can safely enqueue already + // enqueued parts, we simple enqueue each part in newPartitioning + THashSet<TShardIdx> newPartitioningSet; + newPartitioningSet.reserve(newPartitioning.size()); + const auto& oldPartitioning = tableInfo->GetPartitions(); + for (const auto& p: newPartitioning) { + if (!oldPartitioning.empty()) + newPartitioningSet.insert(p.ShardIdx); + const auto& partitionStats = tableInfo->GetStats().PartitionStats; auto it = partitionStats.find(p.ShardIdx); if (it != partitionStats.end()) { - EnqueueCompaction(p.ShardIdx, it->second); + EnqueueBackgroundCompaction(p.ShardIdx, it->second); UpdateShardMetrics(p.ShardIdx, it->second); } } + + for (const auto& p: oldPartitioning) { + if (!newPartitioningSet.contains(p.ShardIdx)) { + // note that queues might not contain the shard + ShardRemoved(p.ShardIdx); + } + } } tableInfo->SetPartitioning(std::move(newPartitioning)); @@ -5984,13 +6012,11 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi if (appConfig.HasCompactionConfig()) { const auto& compactionConfig = appConfig.GetCompactionConfig(); - if (compactionConfig.HasBackgroundCompactionConfig()) { - ConfigureCompactionQueue(compactionConfig.GetBackgroundCompactionConfig(), ctx); - } + ConfigureCompactionQueues(compactionConfig, ctx); } if (IsShemeShardConfigured()) { - StartStopCompactionQueue(); + StartStopCompactionQueues(); } } @@ -6017,7 +6043,24 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu EnableBackgroundCompactionServerless = featureFlags.GetEnableBackgroundCompactionServerless(); } -void TSchemeShard::ConfigureCompactionQueue( +void TSchemeShard::ConfigureCompactionQueues( + const NKikimrConfig::TCompactionConfig& compactionConfig, + const TActorContext &ctx) +{ + if (compactionConfig.HasBackgroundCompactionConfig()) { + ConfigureBackgroundCompactionQueue(compactionConfig.GetBackgroundCompactionConfig(), ctx); + } else { + ConfigureBackgroundCompactionQueue(NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig(), ctx); + } + + if (compactionConfig.HasBorrowedCompactionConfig()) { + ConfigureBorrowedCompactionQueue(compactionConfig.GetBorrowedCompactionConfig(), ctx); + } else { + ConfigureBorrowedCompactionQueue(NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig(), ctx); + } +} + +void TSchemeShard::ConfigureBackgroundCompactionQueue( const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config, const TActorContext &ctx) { @@ -6048,10 +6091,11 @@ void TSchemeShard::ConfigureCompactionQueue( compactionConfig, queueConfig, CompactionStarter); + ctx.RegisterWithSameMailbox(CompactionQueue); } LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "CompactionQueue configured: Timeout# " << compactionConfig.Timeout + "BackgroundCompactionQueue configured: Timeout# " << compactionConfig.Timeout << ", compact single parted# " << (queueConfig.CompactSinglePartedShards ? "yes" : "no") << ", Rate# " << CompactionQueue->GetRate() << ", WakeupInterval# " << compactionConfig.WakeupInterval @@ -6061,7 +6105,34 @@ void TSchemeShard::ConfigureCompactionQueue( << ", MaxRate# " << compactionConfig.MaxRate); } -void TSchemeShard::StartStopCompactionQueue() { +void TSchemeShard::ConfigureBorrowedCompactionQueue( + const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config, + const TActorContext &ctx) +{ + TBorrowedCompactionQueue::TConfig compactionConfig; + + compactionConfig.IsCircular = false; + compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); + compactionConfig.InflightLimit = config.GetInflightLimit(); + compactionConfig.MaxRate = config.GetMaxRate(); + + if (BorrowedCompactionQueue) { + BorrowedCompactionQueue->UpdateConfig(compactionConfig); + } else { + BorrowedCompactionQueue = new TBorrowedCompactionQueue( + compactionConfig, + BorrowedCompactionStarter); + ctx.RegisterWithSameMailbox(BorrowedCompactionQueue); + } + + LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "BorrowedCompactionQueue configured: Timeout# " << compactionConfig.Timeout + << ", Rate# " << BorrowedCompactionQueue->GetRate() + << ", WakeupInterval# " << compactionConfig.WakeupInterval + << ", InflightLimit# " << compactionConfig.InflightLimit); +} + +void TSchemeShard::StartStopCompactionQueues() { // note, that we don't need to check current state of compaction queue if (IsServerlessDomain(TPath::Init(RootPathId(), this))) { if (EnableBackgroundCompactionServerless) { @@ -6076,6 +6147,8 @@ void TSchemeShard::StartStopCompactionQueue() { CompactionQueue->Stop(); } } + + BorrowedCompactionQueue->Start(); } void TSchemeShard::Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr &, const TActorContext &ctx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 99df4e7b65..e7fcebbeed 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -99,6 +99,30 @@ private: TSchemeShard* Self; }; + using TBorrowedCompactionQueue = NOperationQueue::TOperationQueueWithTimer< + TShardIdx, + TFifoQueue<TShardIdx>, + TEvPrivate::EvRunBorrowedCompaction, + NKikimrServices::FLAT_TX_SCHEMESHARD>; + + class TBorrowedCompactionStarter : public TBorrowedCompactionQueue::IStarter { + public: + TBorrowedCompactionStarter(TSchemeShard* self) + : Self(self) + { } + + NOperationQueue::EStartStatus StartOperation(const TShardIdx& shardIdx) override { + return Self->StartBorrowedCompaction(shardIdx); + } + + void OnTimeout(const TShardIdx& shardIdx) override { + Self->OnBorrowedCompactionTimeout(shardIdx); + } + + private: + TSchemeShard* Self; + }; + public: static constexpr ui32 DefaultPQTabletPartitionsCount = 1; static constexpr ui32 MaxPQTabletPartitionsCount = 1000; @@ -207,6 +231,13 @@ public: TCompactionStarter CompactionStarter; TCompactionQueue* CompactionQueue = nullptr; + + TBorrowedCompactionStarter BorrowedCompactionStarter; + TBorrowedCompactionQueue* BorrowedCompactionQueue = nullptr; + + // shardIdx -> clientId + THashMap<TShardIdx, TActorId> RunningBorrowedCompactions; + THashSet<TShardIdx> ShardsWithBorrowed; // shards have parts from another shards THashSet<TShardIdx> ShardsWithLoaned; // shards have parts loaned to another shards bool EnableBackgroundCompaction = false; @@ -298,10 +329,19 @@ public: void ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfig, const TActorContext& ctx); void ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featureFlags, const TActorContext& ctx); - void ConfigureCompactionQueue( + void ConfigureCompactionQueues( + const NKikimrConfig::TCompactionConfig& config, + const TActorContext &ctx); + + void ConfigureBackgroundCompactionQueue( const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config, const TActorContext &ctx); - void StartStopCompactionQueue(); + + void ConfigureBorrowedCompactionQueue( + const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config, + const TActorContext &ctx); + + void StartStopCompactionQueues(); bool ApplyStorageConfig(const TStoragePools& storagePools, const NKikimrSchemeOp::TStorageConfig& storageConfig, @@ -464,7 +504,7 @@ public: void PersistTable(NIceDb::TNiceDb &db, const TPathId pathId); void PersistChannelsBinding(NIceDb::TNiceDb& db, const TShardIdx shardId, const TChannelsBindings& bindedChannels); void PersistTablePartitioning(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo); - void DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo); + void PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo); void PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo); void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats); void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TShardIdx& shardIdx, const TTableInfo::TPtr tableInfo); @@ -640,9 +680,12 @@ public: void ScheduleCleanDroppedPaths(); void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx); - void EnqueueCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); - void UpdateCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); - void RemoveCompaction(const TShardIdx& shardIdx); + void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); + void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); + void RemoveBackgroundCompaction(const TShardIdx& shardIdx); + + void EnqueueBorrowedCompaction(const TShardIdx& shardIdx); + void RemoveBorrowedCompaction(const TShardIdx& shardIdx); void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats); void RemoveShardMetrics(const TShardIdx& shardIdx); @@ -653,6 +696,11 @@ public: void OnBackgroundCompactionTimeout(const TShardCompactionInfo& info); void UpdateBackgroundCompactionQueueMetrics(); + NOperationQueue::EStartStatus StartBorrowedCompaction(const TShardIdx& shardIdx); + void OnBorrowedCompactionTimeout(const TShardIdx& shardIdx); + void BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId); + void UpdateBorrowedCompactionQueueMetrics(); + struct TTxCleanDroppedSubDomains; NTabletFlatExecutor::ITransaction* CreateTxCleanDroppedSubDomains(); @@ -817,6 +865,7 @@ public: void Handle(TEvSchemeShard::TEvMigrateSchemeShardResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvMigrateSchemeShardResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx); + void Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvSchemeShard::TEvSyncTenantSchemeShard::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvUpdateTenantSchemeShard::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 207fd8d2c7..751de0231e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -21,6 +21,7 @@ struct TEvPrivate { EvSubscribeToShardDeletion, EvNotifyShardDeleted, EvRunBackgroundCompaction, + EvRunBorrowedCompaction, EvCompletePublication, EvCompleteBarrier, EvEnd diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index 8d768d3aa0..4da2c1e22b 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -12,6 +12,9 @@ using namespace NSchemeShardUT_Private; namespace { +constexpr TDuration DefaultTimeout = TDuration::Seconds(30); +constexpr TDuration RetryDelay = TDuration::Seconds(1); + using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse::TUserTable>; TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) { @@ -70,25 +73,13 @@ TPathInfo GetPathInfo( return info; } -void CreateTableWithData( +void WriteData( TTestActorRuntime &runtime, - TTestEnv& env, - const char* path, const char* name, - ui32 shardsCount, - ui64& txId, - ui64 schemeshardId = TTestTxConfig::SchemeShard) + ui64 fromKeyInclusive, + ui64 toKey, + ui64 tabletId = TTestTxConfig::FakeHiveTablets) { - TestCreateTable(runtime, schemeshardId, ++txId, path, - Sprintf(R"____( - Name: "%s" - Columns { Name: "key" Type: "Uint64"} - Columns { Name: "value" Type: "Utf8"} - KeyColumnNames: ["key"] - UniformPartitionsCount: %d - )____", name, shardsCount)); - env.TestWaitNotification(runtime, txId, schemeshardId); - auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) { TString writeQuery = Sprintf(R"( ( @@ -104,37 +95,82 @@ void CreateTableWithData( UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);; }; - for (ui64 key = 0; key < 100; ++key) { - fnWriteRow(TTestTxConfig::FakeHiveTablets, key, name); + for (ui64 key = fromKeyInclusive; key < toKey; ++key) { + fnWriteRow(tabletId, key, name); } } -void SetFeatures( +void CreateTableWithData( + TTestActorRuntime &runtime, + TTestEnv& env, + const char* path, + const char* name, + ui32 shardsCount, + ui64& txId, + ui64 schemeshardId = TTestTxConfig::SchemeShard) +{ + TestCreateTable(runtime, schemeshardId, ++txId, path, + Sprintf(R"____( + Name: "%s" + Columns { Name: "key" Type: "Uint64"} + Columns { Name: "value" Type: "Utf8"} + KeyColumnNames: ["key"] + UniformPartitionsCount: %d + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: %d + MaxPartitionsCount: %d + } + } + )____", name, shardsCount, shardsCount, shardsCount)); + env.TestWaitNotification(runtime, txId, schemeshardId); + + WriteData(runtime, name, 0, 100); +} + +void SetConfig( TTestActorRuntime &runtime, - TTestEnv&, ui64 schemeShard, - const NKikimrConfig::TFeatureFlags& features) + THolder<NConsole::TEvConsole::TEvConfigNotificationRequest> request) { + auto sender = runtime.AllocateEdgeActor(); + + runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + + TAutoPtr<IEventHandle> handle; + runtime.GrabEdgeEventRethrow<NConsole::TEvConsole::TEvConfigNotificationResponse>(handle); +} + +THolder<NConsole::TEvConsole::TEvConfigNotificationRequest> GetTestCompactionConfig() { auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); - *request->Record.MutableConfig()->MutableFeatureFlags() = features; - // little hack to simplify life + // little hacks to simplify life auto* compactionConfig = request->Record.MutableConfig()->MutableCompactionConfig(); compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig->MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + compactionConfig->MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds()); + compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds()); // 1 compaction / second compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); compactionConfig->MutableBackgroundCompactionConfig()->SetMaxRate(1); compactionConfig->MutableBackgroundCompactionConfig()->SetRoundSeconds(0); - auto sender = runtime.AllocateEdgeActor(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); - runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + return request; +} - TAutoPtr<IEventHandle> handle; - runtime.GrabEdgeEventRethrow<NConsole::TEvConsole::TEvConfigNotificationResponse>(handle); +void SetFeatures( + TTestActorRuntime &runtime, + TTestEnv&, + ui64 schemeShard, + const NKikimrConfig::TFeatureFlags& features) +{ + auto request = GetTestCompactionConfig(); + *request->Record.MutableConfig()->MutableFeatureFlags() = features; + SetConfig(runtime, schemeShard, std::move(request)); } void SetBackgroundCompactionServerless(TTestActorRuntime &runtime, TTestEnv& env, ui64 schemeShard, bool value) { @@ -163,12 +199,16 @@ void DisableBackgroundCompactionViaRestart( compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds()); + compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds()); // 1 compaction / second compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1); compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0); + compactionConfig.MutableBorrowedCompactionConfig()->SetInflightLimit(1); + TActorId sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, schemeShard, sender); } @@ -188,12 +228,16 @@ void EnableBackgroundCompactionViaRestart( compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true); + compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds()); + compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds()); // 1 compaction / second compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0); compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1); compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0); + compactionConfig.MutableBorrowedCompactionConfig()->SetInflightLimit(1); + TActorId sender = runtime.AllocateEdgeActor(); RebootTablet(runtime, schemeShard, sender); } @@ -201,17 +245,20 @@ void EnableBackgroundCompactionViaRestart( struct TCompactionStats { ui64 BackgroundRequestCount = 0; ui64 BackgroundCompactionCount = 0; + ui64 CompactBorrowedCount = 0; TCompactionStats() = default; TCompactionStats(const NKikimrTxDataShard::TEvGetCompactTableStatsResult& stats) : BackgroundRequestCount(stats.GetBackgroundCompactionRequests()) , BackgroundCompactionCount(stats.GetBackgroundCompactionCount()) + , CompactBorrowedCount(stats.GetCompactBorrowedCount()) {} void Update(const TCompactionStats& other) { BackgroundRequestCount += other.BackgroundRequestCount; BackgroundCompactionCount += other.BackgroundCompactionCount; + CompactBorrowedCount += other.CompactBorrowedCount; } }; @@ -267,12 +314,41 @@ TCompactionStats GetCompactionStats( info.OwnerId); } -void CheckShardCompacted( +void CheckShardBorrowedCompacted( + TTestActorRuntime &runtime, + const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, + ui64 tabletId, + ui64 ownerId) +{ + auto count = GetCompactionStats( + runtime, + userTable, + tabletId, + ownerId).CompactBorrowedCount; + + UNIT_ASSERT(count > 0); +} + +void CheckShardNotBorrowedCompacted( + TTestActorRuntime &runtime, + const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, + ui64 tabletId, + ui64 ownerId) +{ + auto count = GetCompactionStats( + runtime, + userTable, + tabletId, + ownerId).CompactBorrowedCount; + + UNIT_ASSERT_VALUES_EQUAL(count, 0UL); +} + +void CheckShardBackgroundCompacted( TTestActorRuntime &runtime, const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, ui64 tabletId, - ui64 ownerId, - bool shouldCompacted = true) + ui64 ownerId) { auto count = GetCompactionStats( runtime, @@ -280,14 +356,25 @@ void CheckShardCompacted( tabletId, ownerId).BackgroundRequestCount; - if (shouldCompacted) { - UNIT_ASSERT(count > 0); - } else { - UNIT_ASSERT_VALUES_EQUAL(count, 0UL); - } + UNIT_ASSERT(count > 0); } -void CheckNoCompactionsInPeriod( +void CheckShardNotBackgroundCompacted( + TTestActorRuntime &runtime, + const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, + ui64 tabletId, + ui64 ownerId) +{ + auto count = GetCompactionStats( + runtime, + userTable, + tabletId, + ownerId).BackgroundRequestCount; + + UNIT_ASSERT_VALUES_EQUAL(count, 0UL); +} + +void CheckNoBackgroundCompactionsInPeriod( TTestActorRuntime &runtime, TTestEnv& env, const TString& path, @@ -329,8 +416,10 @@ void TestBackgroundCompaction( enableBackgroundCompactionFunc(runtime, env); env.SimulateSleep(runtime, TDuration::Seconds(30)); - for (auto shard: info.Shards) - CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId); + for (auto shard: info.Shards) { + CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); + CheckShardNotBorrowedCompacted(runtime, info.UserTable, shard, info.OwnerId); + } } ui64 TestServerless( @@ -421,8 +510,14 @@ ui64 TestServerless( env.SimulateSleep(runtime, TDuration::Seconds(30)); - for (auto shard: info.Shards) - CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId, enableServerless); + for (auto shard: info.Shards) { + if (enableServerless) + CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); + else + CheckShardNotBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); + + CheckShardNotBorrowedCompacted(runtime, info.UserTable, shard, info.OwnerId); + } return schemeshardId; } @@ -485,7 +580,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { // some time to finish compactions in progress env.SimulateSleep(runtime, TDuration::Seconds(30)); - CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); + CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); } Y_UNIT_TEST(ShouldNotCompactServerless) { @@ -527,7 +622,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { // some time to finish compactions in progress env.SimulateSleep(runtime, TDuration::Seconds(30)); - CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId); + CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId); } Y_UNIT_TEST(SchemeshardShouldNotCompactBackups) { @@ -556,7 +651,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); - CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); + CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL); } @@ -566,9 +661,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { TTestEnv env(runtime); runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_TRACE); // disable for the case, when compaction is enabled by default SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false); @@ -604,11 +697,386 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); - CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); + CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL); // original table should not be compacted as well - CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); + CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); + } + + Y_UNIT_TEST(SchemeshardShouldHandleCompactionTimeouts) { + // note that this test is good to test TOperationQueueWithTimer + + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + + size_t compactionResultCount = 0; + + // capture original observer func by setting dummy one + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCompactTableResult: { + Y_UNUSED(ev.Release()); + ++compactionResultCount; + return TTestActorRuntime::EEventAction::DROP; + } + default: + return originalObserver(runtime, ev); + } + }); + ui64 txId = 1000; + + // note that we create 1-sharded table to avoid complications + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + while (compactionResultCount < 3UL) + env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); + } +}; + +Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { + Y_UNIT_TEST(SchemeshardShouldCompactBorrowed) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + // in case it is not enabled by default + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + + auto configRequest = GetTestCompactionConfig(); + auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + + SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + + ui64 txId = 1000; + + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 5, txId); + + { + // write to all shards in hacky way + auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + for (auto shard: simpleInfo.Shards) { + WriteData(runtime, "Simple", 0, 100, shard); + } + } + env.SimulateSleep(runtime, TDuration::Seconds(1)); + + // copy table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Simple" + )"); + env.TestWaitNotification(runtime, txId); + + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + auto copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable"); + + // borrow compaction only runs when we split, so nothing should be borrow compacted yet + + { + for (auto shard: simpleInfo.Shards) { + CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + } + } + + { + for (auto shard: copyInfo.Shards) { + CheckShardNotBorrowedCompacted(runtime, copyInfo.UserTable, shard, copyInfo.OwnerId); + } + } + + // now force split + + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 20 + MaxPartitionsCount: 20 + SizeToSplit: 1 + } + })"); + env.TestWaitNotification(runtime, txId); + + // schemeshard should get stats from DS to start borrower compactions + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + // should compact all borrowed data (note that background will not compact until then) + + { + for (auto shard: copyInfo.Shards) { + CheckShardBorrowedCompacted(runtime, copyInfo.UserTable, shard, copyInfo.OwnerId); + } + } + + { + // Simple again the only owner + for (auto shard: simpleInfo.Shards) { + CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + } + } + + // now should be no borrower compactions, but background should do the job + + auto copyCount1 = GetCompactionStats(runtime, "/MyRoot/CopyTable").CompactBorrowedCount; + auto simpleCount1 = GetCompactionStats(runtime, "/MyRoot/Simple").CompactBorrowedCount; + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + { + auto info = GetPathInfo(runtime, "/MyRoot/Simple"); + for (auto shard: info.Shards) { + CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); + } + auto simpleCount2 = GetCompactionStats(runtime, "/MyRoot/Simple").CompactBorrowedCount; + UNIT_ASSERT_VALUES_EQUAL(simpleCount1, simpleCount2); + } + + { + auto info = GetPathInfo(runtime, "/MyRoot/CopyTable"); + for (auto shard: info.Shards) { + CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); + } + auto copyCount2 = GetCompactionStats(runtime, "/MyRoot/CopyTable").CompactBorrowedCount; + UNIT_ASSERT_VALUES_EQUAL(copyCount1, copyCount2); + } + } + + Y_UNIT_TEST(SchemeshardShouldHandleBorrowCompactionTimeouts) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto configRequest = GetTestCompactionConfig(); + auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(3); + + SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + + size_t borrowedRequests = 0; + + // capture original observer func by setting dummy one + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCompactBorrowed: { + Y_UNUSED(ev.Release()); + ++borrowedRequests; + return TTestActorRuntime::EEventAction::DROP; + } + default: + return originalObserver(runtime, ev); + } + }); + + ui64 txId = 1000; + + // note that we create 1-sharded table to avoid complications + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + // copy table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Simple" + )"); + env.TestWaitNotification(runtime, txId); + + // now force split + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 2 + MaxPartitionsCount: 2 + SizeToSplit: 1 + } + })"); + env.TestWaitNotification(runtime, txId); + + // wait until DS reports that it has borrowed data + while (borrowedRequests < 1) { + env.SimulateSleep(runtime, TDuration::Seconds(1)); + } + + env.SimulateSleep(runtime, TDuration::Seconds(60)); + + while (borrowedRequests < 3) + env.SimulateSleep(runtime, TDuration::Seconds(10)); + } + + Y_UNIT_TEST(SchemeshardShouldHandleDataShardReboot) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto configRequest = GetTestCompactionConfig(); + auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(1000); // avoid timeouts + + // now we have 1 inflight which will hang the queue in case on long timeout + SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + + size_t borrowedRequests = 0; + + // capture original observer func by setting dummy one + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCompactBorrowed: { + Y_UNUSED(ev.Release()); + ++borrowedRequests; + return TTestActorRuntime::EEventAction::DROP; + } + default: + return originalObserver(runtime, ev); + } + }); + ui64 txId = 1000; + + // note that we create 1-sharded table to avoid complications + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + // copy table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Simple" + )"); + env.TestWaitNotification(runtime, txId); + + // now force split + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 2 + MaxPartitionsCount: 2 + SizeToSplit: 1 + } + })"); + env.TestWaitNotification(runtime, txId); + + // wait until DS reports that it has borrowed data + while (borrowedRequests < 1) { + env.SimulateSleep(runtime, TDuration::Seconds(1)); + } + + env.SimulateSleep(runtime, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 1UL); + + env.SimulateSleep(runtime, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 1UL); + + auto info = GetPathInfo(runtime, "/MyRoot/CopyTable"); + UNIT_ASSERT_VALUES_EQUAL(info.Shards.size(), 1UL); + + // break the pipes and check that SS requested compaction again + TActorId sender = runtime.AllocateEdgeActor(); + RebootTablet(runtime, info.Shards[0], sender); + env.SimulateSleep(runtime, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 2UL); + + // one more time + RebootTablet(runtime, info.Shards[0], sender); + env.SimulateSleep(runtime, TDuration::Seconds(1)); + UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 3UL); + } + + Y_UNIT_TEST(SchemeshardShouldNotCompactAfterDrop) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto configRequest = GetTestCompactionConfig(); + auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(5); + + // now we have 1 inflight which will hang the queue in case on long timeout + SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + + size_t borrowedRequests = 0; + + // capture original observer func by setting dummy one + auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { + return TTestActorRuntime::EEventAction::PROCESS; + }); + // now set our observer backed up by original + runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::EvCompactBorrowed: { + Y_UNUSED(ev.Release()); + ++borrowedRequests; + return TTestActorRuntime::EEventAction::DROP; + } + default: + return originalObserver(runtime, ev); + } + }); + ui64 txId = 1000; + + // note that we create 1-sharded table to avoid complications + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + // copy table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + CopyFromTable: "/MyRoot/Simple" + )"); + env.TestWaitNotification(runtime, txId); + + // now force split + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "CopyTable" + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 2 + MaxPartitionsCount: 2 + SizeToSplit: 1 + } + })"); + env.TestWaitNotification(runtime, txId); + + // wait until DS reports that it has borrowed data + while (borrowedRequests < 1) { + env.SimulateSleep(runtime, TDuration::MilliSeconds(100)); + } + + auto requestsBefore = borrowedRequests; + + // SS waits reply from DS, drop the table meanwhile + TestDropTable(runtime, ++txId, "/MyRoot", "CopyTable"); + env.TestWaitNotification(runtime, txId); + env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets + 1); + + env.SimulateSleep(runtime, TDuration::Seconds(10)); // 2x timeout + UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, requestsBefore); } }; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 617a1a7f33..4cc983a6d9 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -60,6 +60,7 @@ SRCS( defs.h schemeshard.h schemeshard.cpp + schemeshard__borrowed_compaction.cpp schemeshard__compaction.cpp schemeshard__clean_pathes.cpp schemeshard__conditional_erase.cpp diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index 19c8750b49..61bed56bc3 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -4,7 +4,9 @@ #include "intrusive_heap.h" #include "token_bucket.h" -#include <library/cpp/time_provider/time_provider.h> +#include <ydb/core/base/defs.h> + +#include <library/cpp/actors/core/monotonic.h> #include <util/datetime/base.h> #include <util/generic/algorithm.h> @@ -26,9 +28,9 @@ enum class EStartStatus { class ITimer { public: // asks to call TOperationQueue::Wakeup() - virtual void SetWakeupTimer(TInstant t) = 0; + virtual void SetWakeupTimer(TDuration delta) = 0; - virtual TInstant Now() = 0; + virtual TMonotonic Now() = 0; }; template <typename T> @@ -195,18 +197,18 @@ public: struct TItemWithTs { T Item; - TInstant Timestamp; + TMonotonic Timestamp; explicit TItemWithTs(const T& item) : Item(item) { } - TItemWithTs(const T& item, TInstant s) + TItemWithTs(const T& item, TMonotonic s) : Item(item) , Timestamp(s) { } - TItemWithTs(T&& item, TInstant s) + TItemWithTs(T&& item, TMonotonic s) : Item(std::move(item)) , Timestamp(s) { } @@ -259,10 +261,10 @@ private: TRunningItems RunningItems; TWaitingItems WaitingItems; - TTokenBucket TokenBucket; + TTokenBucketBase<TMonotonic> TokenBucket; bool HasRateLimit = false; - TInstant NextWakeup; + TMonotonic NextWakeup; bool Running = false; bool WasRunning = false; @@ -527,8 +529,10 @@ TDuration TOperationQueue<T, TQueue>::OnDone(const T& item) { template <typename T, typename TQueue> void TOperationQueue<T, TQueue>::Wakeup() { + NextWakeup = {}; StartOperations(); - ScheduleWakeup(); + if (!NextWakeup) + ScheduleWakeup(); } template <typename T, typename TQueue> @@ -551,10 +555,19 @@ void TOperationQueue<T, TQueue>::CheckTimeoutOperations() { while (!RunningItems.Empty()) { const auto& item = RunningItems.Front(); if (item.Timestamp + Config.Timeout <= now) { - Starter.OnTimeout(item.Item); - if (Config.IsCircular) - ReEnqueueNoStart(std::move(item.Item)); + auto movedItem = std::move(item.Item); + + // we want to pop before calling OnTimeout, because + // it might want to enqueue in case of non-circular + // queue RunningItems.PopFront(); + + // note that OnTimeout() can enqueue item back + // in case of non-circular queue + Starter.OnTimeout(movedItem); + + if (Config.IsCircular) + ReEnqueueNoStart(std::move(movedItem)); continue; } break; @@ -622,17 +635,17 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { if (TokenBucket.Available() <= 0) { // we didn't start anything because of RPS limit NextWakeup = now + TokenBucket.NextAvailableDelay(); - Timer.SetWakeupTimer(NextWakeup); + Timer.SetWakeupTimer(TokenBucket.NextAvailableDelay()); return; - } else if (!NextWakeup || NextWakeup <= now) { + } else if (!NextWakeup) { // special case when we failed to start anything NextWakeup = now + Config.WakeupInterval; - Timer.SetWakeupTimer(NextWakeup); + Timer.SetWakeupTimer(Config.WakeupInterval); return; } } - auto wakeup = TInstant::Max(); + auto wakeup = TMonotonic::Max(); if (Config.Timeout && !RunningItems.Empty()) { const auto& item = RunningItems.Front(); @@ -644,7 +657,7 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { wakeup = Min(wakeup, item.Timestamp + Config.MinOperationRepeatDelay); } - if (wakeup == TInstant::Max()) + if (wakeup == TMonotonic::Max()) return; // no sense to wakeup earlier that rate limit allows @@ -652,9 +665,9 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() { wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay()); } - if (!NextWakeup || NextWakeup > wakeup || NextWakeup <= now) { + if (!NextWakeup || NextWakeup > wakeup) { NextWakeup = wakeup; - Timer.SetWakeupTimer(NextWakeup); + Timer.SetWakeupTimer(wakeup - now); } } diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp index 32bd274fe6..687350cc3b 100644 --- a/ydb/core/util/operation_queue_priority_ut.cpp +++ b/ydb/core/util/operation_queue_priority_ut.cpp @@ -2,6 +2,7 @@ #include "circular_queue.h" +#include <library/cpp/actors/core/monotonic_provider.h> #include <library/cpp/testing/unittest/registar.h> #include <util/generic/ptr.h> @@ -65,9 +66,9 @@ namespace { TDuration Timeout = TDuration::Minutes(10); -class TSimpleTimeProvider : public ITimeProvider { +class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider { public: - TInstant Now() override { + TMonotonic Now() override { return Now_; } @@ -75,12 +76,12 @@ public: Now_ += delta; } - void Move(TInstant now) { + void Move(TMonotonic now) { Now_ = now; } private: - TInstant Now_; + TMonotonic Now_; }; @@ -88,7 +89,7 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue TSimpleTimeProvider TimeProvider; TVector<TPriorityItem> StartHistory; - TVector<TInstant> WakeupHistory; + TVector<TMonotonic> WakeupHistory; NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning; @@ -98,15 +99,15 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue return StartResult; } - void SetWakeupTimer(TInstant t) override + void SetWakeupTimer(TDuration delta) override { - WakeupHistory.push_back(t); + WakeupHistory.push_back(this->Now() + delta); } void OnTimeout(const TPriorityItem&) override {} - TInstant Now() override + TMonotonic Now() override { return TimeProvider.Now(); } diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp index 1015cb217e..7daa22ba67 100644 --- a/ydb/core/util/operation_queue_ut.cpp +++ b/ydb/core/util/operation_queue_ut.cpp @@ -2,6 +2,7 @@ #include "circular_queue.h" +#include <library/cpp/actors/core/monotonic_provider.h> #include <library/cpp/testing/unittest/registar.h> #include <util/generic/ptr.h> @@ -14,9 +15,9 @@ namespace { TDuration Timeout = TDuration::Minutes(10); -class TSimpleTimeProvider : public ITimeProvider { +class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider { public: - TInstant Now() override { + TMonotonic Now() override { return Now_; } @@ -24,12 +25,12 @@ public: Now_ += delta; } - void Move(TInstant now) { + void Move(TMonotonic now) { Now_ = now; } private: - TInstant Now_; + TMonotonic Now_; }; using TQueue = TOperationQueue<int, TFifoQueue<int>>; @@ -38,7 +39,7 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim TSimpleTimeProvider TimeProvider; TVector<int> StartHistory; - TVector<TInstant> WakeupHistory; + TVector<TMonotonic> WakeupHistory; NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning; @@ -48,15 +49,15 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim return StartResult; } - void SetWakeupTimer(TInstant t) override + void SetWakeupTimer(TDuration delta) override { - WakeupHistory.push_back(t); + WakeupHistory.push_back(this->Now() + delta); } void OnTimeout(const int&) override {} - TInstant Now() override + TMonotonic Now() override { return TimeProvider.Now(); } @@ -68,7 +69,7 @@ void CheckQueue( TVector<TQueue::TItemWithTs> runningGold, TVector<int> inQueueGold, TVector<int> startHistory, - TVector<TInstant> wakeupHistory) + TVector<TMonotonic> wakeupHistory) { auto running = queue.GetRunning(); auto inQueue = queue.GetQueue(); @@ -107,7 +108,7 @@ void TestStartInflightBeforeStart(int inflight, int pushN = 10) { runningGold.push_back({i, now}); } - TVector<TInstant> wakeupsGold = + TVector<TMonotonic> wakeupsGold = { starter.TimeProvider.Now() + config.Timeout }; CheckQueue( @@ -131,7 +132,7 @@ void TestInflightWithEnqueue(int inflight, int pushN = 10) { TVector<TQueue::TItemWithTs> runningGold; TVector<int> queuedGold; - TVector<TInstant> wakeupsGold = + TVector<TMonotonic> wakeupsGold = { starter.TimeProvider.Now() + TDuration::Seconds(1) + config.Timeout }; int lastStarted = 0; @@ -988,6 +989,36 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL); UNIT_ASSERT(starter.WakeupHistory.back() > starter.TimeProvider.Now()); } + + Y_UNIT_TEST(ShouldTolerateInaccurateTimer) { + // should properly work when wokeup earlier than requested (i.e. properly set new timer) + // regression test: woke up earlier and didn't set new wakeup + + TQueue::TConfig config; + config.IsCircular = true; + config.InflightLimit = 1; + config.MaxRate = 0.0; + config.Timeout = Timeout; + TOperationStarter starter; + + TQueue queue(config, starter, starter); + queue.Start(); + + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 0UL); + + queue.Enqueue(1); + + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL); + + // expect to wakeup on Timeout, but wakeup earlier + starter.TimeProvider.Move(Timeout - TDuration::Seconds(1)); + queue.Wakeup(); + + UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL); + UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL); + + UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 2UL); + } }; } // NOperationQueue diff --git a/ydb/core/util/token_bucket.h b/ydb/core/util/token_bucket.h index 060f8c75a6..8ed8475e53 100644 --- a/ydb/core/util/token_bucket.h +++ b/ydb/core/util/token_bucket.h @@ -6,16 +6,17 @@ namespace NKikimr { -class TTokenBucket { +template<typename TTime> +class TTokenBucketBase { double Tokens = 0.0; // tokens currenty in bucket double Rate = 0.0; // tokens filling rate [tokens/sec] double Capacity = 0.0; // maximum amount of tokens allowed in bucket - TInstant LastFill = TInstant::Zero(); + TTime LastFill; public: // Create unlimited bucket // NOTE: any bucket is created fully filled - TTokenBucket() { + TTokenBucketBase() { SetUnlimited(); } @@ -46,7 +47,7 @@ public: } // Fill bucket with tokens, should be done just before Take() - void Fill(TInstant now) { + void Fill(TTime now) { // NOTE: LastFill is allowed to be zero, the following code will work OK TDuration elapsed = now - LastFill; Tokens += elapsed.SecondsFloat() * Rate; @@ -62,7 +63,7 @@ public: } // Fill and take if available, returns taken amount - double FillAndTryTake(TInstant now, double amount) { + double FillAndTryTake(TTime now, double amount) { Fill(now); amount = Min(amount, Tokens); Take(amount); @@ -78,7 +79,7 @@ public: return TDuration::MicroSeconds(std::ceil(Available() * -1000000.0 / Rate)); } - TDuration FillAndNextAvailableDelay(TInstant now) { + TDuration FillAndNextAvailableDelay(TTime now) { Fill(now); return NextAvailableDelay(); } @@ -101,4 +102,6 @@ public: } }; +using TTokenBucket = TTokenBucketBase<TInstant>; + } // namespace NKikimr diff --git a/ydb/core/util/ya.make b/ydb/core/util/ya.make index 4f325d8fcb..91416afbc9 100644 --- a/ydb/core/util/ya.make +++ b/ydb/core/util/ya.make @@ -62,6 +62,7 @@ SRCS( ) PEERDIR( + library/cpp/actors/core library/cpp/actors/interconnect/mock library/cpp/actors/util library/cpp/containers/stack_vector |