diff options
| author | Evgeniy Ivanov <[email protected]> | 2022-05-24 19:24:16 +0300 | 
|---|---|---|
| committer | Evgeniy Ivanov <[email protected]> | 2022-05-24 19:24:16 +0300 | 
| commit | e62a28e708a9c85b312652ff618f8a64d17b2dbc (patch) | |
| tree | 0a4e8c246137833f4e06cb02a09b048bd9f19843 | |
| parent | 2816f26da8d88e8e58e41f73be11bff1b8940e11 (diff) | |
KIKIMR-14017: implement queue to compact borrowed data
ref:7e1f5e7962090631e847a6af16f21282d8f58ddc
28 files changed, 967 insertions, 147 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e132127487e..bf0b02918dd 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1445,7 +1445,16 @@ message TCompactionConfig {          optional bool CompactSinglePartedShards = 10 [default = false];      } +    message TBorrowedCompactionConfig { +        optional double MaxRate = 1 [default = 0]; // unlimitted +        optional uint64 InflightLimit = 2 [default = 10]; // TODO: consider more? + +        // After this interval we will try to restart +        optional uint64 TimeoutSeconds = 3 [default = 15]; +    } +      optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1; +    optional TBorrowedCompactionConfig BorrowedCompactionConfig = 2;  }  // This message is used to upload custom service configs diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index bd0534b6515..d68c7d1387c 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -110,6 +110,7 @@ enum ECumulativeCounters {      COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84    [(CounterOpts) = {Name: "TxCompactTableFailedStart"}];      COUNTER_FULL_COMPACTION_DONE = 85                     [(CounterOpts) = {Name: "FullCompactionCount"}];      COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED = 86   [(CounterOpts) = {Name: "TxCompactTableFailedLoaned"}]; +    COUNTER_TX_COMPACT_BORROWED = 87                      [(CounterOpts) = {Name: "TxCompactBorrowed"}];  }  enum EPercentileCounters { diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 4a2188d4fbd..570566e293d 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -155,6 +155,9 @@ enum ESimpleCounters {      COUNTER_COMPACTION_QUEUE_WAITING_REPEAT = 126         [(CounterOpts) = {Name: "BackgroundCompactionQueueWaitingRepeat"}];      COUNTER_SHARDS_WITH_BORROWED_DATA = 127               [(CounterOpts) = {Name: "TableShardsWithBorrowedData"}];      COUNTER_SHARDS_WITH_LOANED_DATA = 128                 [(CounterOpts) = {Name: "TableShardsWithLoanedData"}]; + +    COUNTER_BORROWED_COMPACTION_QUEUE_SIZE = 129          [(CounterOpts) = {Name: "BorrowedCompactionQueueSize"}]; +    COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING = 130       [(CounterOpts) = {Name: "BorrowedCompactionQueueRunning"}];  }  enum ECumulativeCounters { @@ -246,6 +249,9 @@ enum ECumulativeCounters {      COUNTER_BACKGROUND_COMPACTION_BORROWED = 73           [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}];      COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74         [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}];      COUNTER_BACKGROUND_COMPACTION_LOANED = 75             [(CounterOpts) = {Name: "BackgroundCompactionLoaned"}]; + +    COUNTER_BORROWED_COMPACTION_OK = 76                  [(CounterOpts) = {Name: "BorrowedCompactionOK"}]; +    COUNTER_BORROWED_COMPACTION_TIMEOUT = 77             [(CounterOpts) = {Name: "BorrowedCompactionTimeout"}];  }  enum EPercentileCounters { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index bcb9dff85a8..b43410e07a7 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1430,6 +1430,11 @@ message TEvCompactBorrowed {      optional NKikimrProto.TPathID PathId = 1;  } +message TEvCompactBorrowedResult { +    optional uint64 TabletId = 1; +    optional NKikimrProto.TPathID PathId = 2; +} +  message TEvGetCompactTableStats {      optional NKikimrProto.TPathID PathId = 1;  } @@ -1437,6 +1442,7 @@ message TEvGetCompactTableStats {  message TEvGetCompactTableStatsResult {      optional uint64 BackgroundCompactionRequests = 1;      optional uint64 BackgroundCompactionCount = 2; +    optional uint64 CompactBorrowedCount = 3;  }  // TEvRead is used to request multiple queries from a shard and diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 45cca5a0771..2090ee3a159 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -404,6 +404,11 @@ TAutoPtr<TSubset> TDatabase::ScanSnapshot(ui32 table, TRowVersion snapshot)      return Require(table)->ScanSnapshot(snapshot);  } +bool TDatabase::HasBorrowed(ui32 table, ui64 selfTabletId) const +{ +    return Require(table)->HasBorrowed(selfTabletId); +} +  TBundleSlicesMap TDatabase::LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const  {      return Require(table)->LookupSlices(bundles); diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index ee73c8300a0..2cea6d756a5 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -160,6 +160,8 @@ public:      TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const;      TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max()); +    bool HasBorrowed(ui32 table, ui64 selfTabletId) const; +      TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const;      void ReplaceSlices(ui32 table, TBundleSlicesMap slices); diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 4f6b5de3d1a..4b8fc706e76 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -3806,6 +3806,11 @@ bool TExecutor::HasLoanedParts() const {      return false;  } +bool TExecutor::HasBorrowed(ui32 table, ui64 selfTabletId) const { +    Y_VERIFY_S(Database, "Checking borrowers of table# " << table << " for tablet# " << selfTabletId); +    return Database->HasBorrowed(table, selfTabletId); +} +  const TExecutorStats& TExecutor::GetStats() const {      return *Stats;  } diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 0424f09c7d9..5fd0e391d98 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -647,6 +647,8 @@ public:      THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const override;      bool HasLoanedParts() const override; +    bool HasBorrowed(ui32 table, ui64 selfTabletId) const override; +      const TExecutorStats& GetStats() const override;      NMetrics::TResourceMetrics* GetResourceMetrics() const override; diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index d5ce3ee8810..7ce3b7adb57 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -122,6 +122,23 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept      return subset;  } +bool TTable::HasBorrowed(ui64 selfTabletId) const noexcept +{ +    for (const auto &it : TxStatus) +        if (it.second->Label.TabletID() != selfTabletId) +            return true; + +    for (auto &it: Flatten) +        if (it.second->Label.TabletID() != selfTabletId) +            return true; + +    for (auto &it: ColdParts) +        if (it.second->Label.TabletID() != selfTabletId) +            return true; + +    return false; +} +  TAutoPtr<TSubset> TTable::ScanSnapshot(TRowVersion snapshot) noexcept  {      TAutoPtr<TSubset> subset = new TSubset(Epoch, Scheme); @@ -926,16 +943,6 @@ TCompactionStats TTable::GetCompactionStats() const      stats.MemRowCount = GetMemRowCount();      stats.MemDataSize = GetMemSize();      stats.MemDataWaste = GetMemWaste(); - -    for (auto &it: ColdParts) -        stats.PartOwners.insert(it.second->Label.TabletID()); - -    for (auto &it: Flatten) -        stats.PartOwners.insert(it.second->Label.TabletID()); - -    for (auto &it: TxStatus) -        stats.PartOwners.insert(it.second->Label.TabletID()); -      stats.PartCount = Flatten.size() + ColdParts.size();      return stats; diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 90dab02babc..cb5095a185f 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -81,6 +81,8 @@ public:      TAutoPtr<TSubset> ScanSnapshot(TRowVersion snapshot = TRowVersion::Max()) noexcept;      TAutoPtr<TSubset> Unwrap() noexcept; /* full Subset(..) + final Replace(..) */ +    bool HasBorrowed(ui64 selfTabletId) const noexcept; +      /**       * Returns current slices for bundles       * diff --git a/ydb/core/tablet_flat/flat_table_stats.h b/ydb/core/tablet_flat/flat_table_stats.h index 4b632558b6f..a83e1f7bcbd 100644 --- a/ydb/core/tablet_flat/flat_table_stats.h +++ b/ydb/core/tablet_flat/flat_table_stats.h @@ -43,7 +43,6 @@ namespace NTable {      };      struct TCompactionStats { -        THashSet<ui64> PartOwners;          ui64 PartCount = 0;          ui64 MemRowCount = 0;          ui64 MemDataSize = 0; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 3c18a9ba871..8787bdb066f 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -542,6 +542,8 @@ namespace NFlatExecutorSetup {          virtual THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const = 0;          virtual bool HasLoanedParts() const = 0; +        virtual bool HasBorrowed(ui32 table, ui64 selfTabletId) const = 0; +          // This method lets executor know about new yellow channels          virtual void OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) = 0; diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 06f357c8e97..b1ef4bad615 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -299,6 +299,8 @@ struct TEvDataShard {          EvReadAck,          EvReadCancel, +        EvCompactBorrowedResult, +          EvEnd      }; @@ -946,7 +948,7 @@ struct TEvDataShard {          // Arrow          std::shared_ptr<arrow::RecordBatch> ArrowBatch; -     +      private:          // for local events          TVector<TOwnedCellVec> Rows; @@ -1396,23 +1398,47 @@ struct TEvDataShard {      /**       * This message is used to ask datashard to compact any borrowed parts it has -     * for the specified user table. No reply is expected for this message, -     * instead schemeshard expects to receive updated stats if and when -     * such a compaction has finished. +     * for the specified user table.       */ -    struct TEvCompactBorrowed : public TEventPB<TEvCompactBorrowed, NKikimrTxDataShard::TEvCompactBorrowed, EvCompactBorrowed> { +    struct TEvCompactBorrowed : public TEventPB<TEvCompactBorrowed, +                                                NKikimrTxDataShard::TEvCompactBorrowed, +                                                EvCompactBorrowed> {          TEvCompactBorrowed() = default; -        TEvCompactBorrowed(const TPathId& pathId) { -            Record.MutablePathId()->SetOwnerId(pathId.OwnerId); -            Record.MutablePathId()->SetLocalId(pathId.LocalPathId); +        TEvCompactBorrowed(ui64 ownerId, ui64 localId) { +            Record.MutablePathId()->SetOwnerId(ownerId); +            Record.MutablePathId()->SetLocalId(localId);          } +        TEvCompactBorrowed(const TPathId& pathId) +            : TEvCompactBorrowed(pathId.OwnerId, pathId.LocalPathId) +        { } +          // Sanity check for safe merging to earlier versions          static_assert(EvCompactBorrowed == EventSpaceBegin(TKikimrEvents::ES_TX_DATASHARD) + 7 * 512 + 60,                      "EvCompactBorrowed event has an unexpected value");      }; +    struct TEvCompactBorrowedResult : public TEventPB<TEvCompactBorrowedResult, +                                                      NKikimrTxDataShard::TEvCompactBorrowedResult, +                                                      EvCompactBorrowedResult> { +        TEvCompactBorrowedResult() = default; + +        TEvCompactBorrowedResult(ui64 tabletId, ui64 ownerId, ui64 localId) { +            Record.SetTabletId(tabletId); +            Record.MutablePathId()->SetOwnerId(ownerId); +            Record.MutablePathId()->SetLocalId(localId); +        } + +        TEvCompactBorrowedResult(ui64 tabletId, const TPathId& pathId) +            : TEvCompactBorrowedResult(tabletId, pathId.OwnerId, pathId.LocalPathId) +        {} + +        // Sanity check for safe merging to earlier versions +        static_assert(EvCompactBorrowedResult == EventSpaceBegin(TKikimrEvents::ES_TX_DATASHARD) + 7 * 512 + 68, +                    "EvCompactBorrowedResult event has an unexpected value"); +    }; +      struct TEvGetCompactTableStats : public TEventPB<TEvGetCompactTableStats, NKikimrTxDataShard::TEvGetCompactTableStats,                                                       TEvDataShard::EvGetCompactTableStats> {          TEvGetCompactTableStats() = default; diff --git a/ydb/core/tx/datashard/datashard__compact_borrowed.cpp b/ydb/core/tx/datashard/datashard__compact_borrowed.cpp index cd9378a2133..cad476c8e52 100644 --- a/ydb/core/tx/datashard/datashard__compact_borrowed.cpp +++ b/ydb/core/tx/datashard/datashard__compact_borrowed.cpp @@ -21,38 +21,31 @@ public:              << " for table " << pathId              << " at tablet " << Self->TabletID()); +        auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId); +          if (pathId.OwnerId != Self->GetPathOwnerId()) {              // Ignore unexpected owner +            ctx.Send(Ev->Sender, std::move(response));              return true;          }          auto it = Self->TableInfos.find(pathId.LocalPathId);          if (it == Self->TableInfos.end()) {              // Ignore unexpected table (may normally happen with races) +            ctx.Send(Ev->Sender, std::move(response));              return true;          }          const TUserTable& tableInfo = *it->second; -        auto subset = txc.DB.Subset(tableInfo.LocalTid, NTable::TEpoch::Max(), {}, {}); - -        THashSet<ui64> partOwners; -        NTable::GetPartOwners(*subset, partOwners); - -        bool hasBorrowed = false; -        for (ui64 tabletId : partOwners) { -            if (tabletId != Self->TabletID()) { -                hasBorrowed = true; -                break; -            } -        } - +        bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID());          if (!hasBorrowed) {              LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,                  "TEvCompactBorrowed request from " << Ev->Sender                  << " for table " << pathId                  << " has no borrowed parts"                  << " at tablet " << Self->TabletID()); +            ctx.Send(Ev->Sender, std::move(response));              return true;          } @@ -61,7 +54,13 @@ public:              << " for table " << pathId              << " starting compaction for local table " << tableInfo.LocalTid              << " at tablet " << Self->TabletID()); +          Self->Executor()->CompactBorrowed(tableInfo.LocalTid); +        Self->IncCounter(COUNTER_TX_COMPACT_BORROWED); +        ++tableInfo.Stats.CompactBorrowedCount; + +        Self->CompactBorrowedWaiters[tableInfo.LocalTid].emplace_back(Ev->Sender); +          return true;      } diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp index 684ca76c7f4..de0720cdbae 100644 --- a/ydb/core/tx/datashard/datashard__compaction.cpp +++ b/ydb/core/tx/datashard/datashard__compaction.cpp @@ -66,17 +66,7 @@ public:          ++tableInfo.Stats.BackgroundCompactionRequests; -        auto stats = txc.DB.GetCompactionStats(localTid); - -        bool hasBorrowed = false; -        if (stats.PartOwners.size() > 1) { -            hasBorrowed = true; -        } else if (stats.PartOwners.size() == 1) { -            if (*stats.PartOwners.begin() != Self->TabletID()) { -                hasBorrowed = true; -            } -        } - +        bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID());          if (hasBorrowed && !record.GetCompactBorrowed()) {              // normally we should not receive requests to compact in this case              // but in some rare cases like schemeshard restart we can @@ -115,6 +105,7 @@ public:              return true;          } +        auto stats = txc.DB.GetCompactionStats(localTid);          bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0;          bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0;          if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) { @@ -149,7 +140,7 @@ public:                  << ", memtableRows# " << stats.MemRowCount);              Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION); -            Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, pathId, Ev->Sender)); +            Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, Ev->Sender));              ++tableInfo.Stats.BackgroundCompactionCount;          } else {              // compaction failed, for now we don't care @@ -205,6 +196,7 @@ void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorCon  void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {      auto finishedInfo = Executor()->GetFinishedCompactionInfo(tableId); +    TLocalPathId localPathId = InvalidLocalPathId;      if (tableId >= Schema::MinLocalTid) {          for (auto& ti : TableInfos) {              if (ti.second->LocalTid != tableId && ti.second->ShadowTid != tableId) @@ -221,40 +213,68 @@ void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {              }              ti.second->StatsNeedUpdate = true; +            localPathId = ti.first;              UpdateTableStats(ctx);              break;          }      } -    ReplyCompactionWaiters(tableId, finishedInfo.Edge, ctx); +    ReplyCompactionWaiters(tableId, localPathId, finishedInfo, ctx);  } -void TDataShard::ReplyCompactionWaiters(ui32 tableId, ui64 edge, const TActorContext &ctx) { +void TDataShard::ReplyCompactionWaiters( +    ui32 tableId, +    TLocalPathId localPathId, +    const NTabletFlatExecutor::TFinishedCompactionInfo& compactionInfo, +    const TActorContext &ctx) +{      LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,          "CompactionComplete of tablet# "<< TabletID()          << ", table# " << tableId -        << ", finished edge# " << edge +        << ", finished edge# " << compactionInfo.Edge          << ", front# " << (CompactionWaiters[tableId].empty() ? 0UL : std::get<0>(CompactionWaiters[tableId].front()))); -    while (!CompactionWaiters[tableId].empty()) { +    auto& fullCompactionQueue = CompactionWaiters[tableId]; +    while (!fullCompactionQueue.empty()) {          const auto& waiter = CompactionWaiters[tableId].front(); -        if (std::get<0>(waiter) > edge) +        if (std::get<0>(waiter) > compactionInfo.Edge)              break; -        const auto& pathId = std::get<1>(waiter); -        const auto& sender = std::get<2>(waiter); +        const auto& sender = std::get<1>(waiter);          auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>(              TabletID(), -            pathId.OwnerId, -            pathId.LocalPathId, +            GetPathOwnerId(), +            localPathId,              NKikimrTxDataShard::TEvCompactTableResult::OK);          ctx.Send(sender, std::move(response));          LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,              "Sending TEvCompactTableResult to# " << sender -            << "pathId# " << pathId); +            << "pathId# " << TPathId(GetPathOwnerId(), localPathId)); -        CompactionWaiters[tableId].pop_front(); +        fullCompactionQueue.pop_front(); +    } + +    auto& compactBorrowedQueue = CompactBorrowedWaiters[tableId]; +    if (!compactBorrowedQueue.empty()) { +        const bool hasBorrowed = Executor()->HasBorrowed(tableId, TabletID()); +        if (!hasBorrowed) { +            while (!compactBorrowedQueue.empty()) { +                const auto& waiter = compactBorrowedQueue.front(); + +                auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>( +                    TabletID(), +                    GetPathOwnerId(), +                    localPathId); +                ctx.Send(waiter, std::move(response)); + +                LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, +                    "Sending TEvCompactBorrowedResult to# " << waiter +                    << "pathId# " << TPathId(GetPathOwnerId(), localPathId)); + +                compactBorrowedQueue.pop_front(); +            } +        }      }  } @@ -270,6 +290,7 @@ void TDataShard::Handle(TEvDataShard::TEvGetCompactTableStats::TPtr& ev, const T          const TUserTable& tableInfo = *it->second;          response->Record.SetBackgroundCompactionRequests(tableInfo.Stats.BackgroundCompactionRequests);          response->Record.SetBackgroundCompactionCount(tableInfo.Stats.BackgroundCompactionCount); +        response->Record.SetCompactBorrowedCount(tableInfo.Stats.CompactBorrowedCount);      }      ctx.Send(ev->Sender, std::move(response)); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index a6b7dc530df..88f8b35bf90 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1309,7 +1309,11 @@ public:      void CompactionComplete(ui32 tableId, const TActorContext &ctx) override;      void CompletedLoansChanged(const TActorContext &ctx) override; -    void ReplyCompactionWaiters(ui32 tableId, ui64 edge, const TActorContext &ctx); +    void ReplyCompactionWaiters( +        ui32 tableId, +        TLocalPathId localPathId, +        const NTabletFlatExecutor::TFinishedCompactionInfo& compactionInfo, +        const TActorContext &ctx);      TUserTable::TSpecialUpdate SpecialUpdates(const NTable::TDatabase& db, const TTableId& tableId) const; @@ -2201,8 +2205,8 @@ private:      // in      THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id -    // compactionId, tableId/ownerId, actorId -    using TCompactionWaiter = std::tuple<ui64, TPathId, TActorId>; +    // compactionId, actorId +    using TCompactionWaiter = std::tuple<ui64, TActorId>;      using TCompactionWaiterList = TList<TCompactionWaiter>;      // tableLocalTid -> waiters, note that compactionId is monotonically @@ -2211,6 +2215,11 @@ private:      // from the front      THashMap<ui32, TCompactionWaiterList> CompactionWaiters; +    using TCompactBorrowedWaiterList = TList<TActorId>; + +    // tableLocalTid -> waiters, similar to CompactionWaiters +    THashMap<ui32, TCompactBorrowedWaiterList> CompactBorrowedWaiters; +      struct TReplicationSourceOffsetsReceiveState {          // A set of tables for which we already received offsets          THashSet<TPathId> Received; diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h index 6bbd8dec1a7..f3b6f8112c4 100644 --- a/ydb/core/tx/datashard/datashard_user_table.h +++ b/ydb/core/tx/datashard/datashard_user_table.h @@ -320,6 +320,7 @@ struct TUserTable : public TThrRefBase {          ui64 RowCountResolution = 0;          ui64 BackgroundCompactionRequests = 0;          ui64 BackgroundCompactionCount = 0; +        ui64 CompactBorrowedCount = 0;          NTable::TKeyAccessSample AccessStats;          void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) { diff --git a/ydb/core/tx/schemeshard/CMakeLists.txt b/ydb/core/tx/schemeshard/CMakeLists.txt index 34eaa4f2e9a..1225cde3273 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.txt @@ -49,6 +49,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC  )  target_sources(core-tx-schemeshard PRIVATE    ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard.cpp +  ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__compaction.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__clean_pathes.cpp    ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard__conditional_erase.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp new file mode 100644 index 00000000000..1da2fd32e6d --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp @@ -0,0 +1,161 @@ +#include "schemeshard_impl.h" + +namespace NKikimr::NSchemeShard { + +NOperationQueue::EStartStatus TSchemeShard::StartBorrowedCompaction(const TShardIdx& shardIdx) { +    UpdateBorrowedCompactionQueueMetrics(); + +    auto ctx = TActivationContext::ActorContextFor(SelfId()); + +    auto it = ShardInfos.find(shardIdx); +    if (it == ShardInfos.end()) { +        LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " +            "for borrowed compaction# " << shardIdx +            << " at schemeshard# " << TabletID()); + +        return NOperationQueue::EStartStatus::EOperationRemove; +    } + +    const auto& datashardId = it->second.TabletID; +    const auto& pathId = it->second.PathId; + +    LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBorrowedCompaction " +        "for pathId# " << pathId << ", datashard# " << datashardId +        << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() +        << ", rate# " << BorrowedCompactionQueue->GetRate() +        << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" +        << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" +        << " at schemeshard " << TabletID()); + +    std::unique_ptr<TEvDataShard::TEvCompactBorrowed> request( +        new TEvDataShard::TEvCompactBorrowed(pathId.OwnerId, pathId.LocalPathId)); + +    RunningBorrowedCompactions[shardIdx] = PipeClientCache->Send( +        ctx, +        ui64(datashardId), +        request.release()); + +    return NOperationQueue::EStartStatus::EOperationRunning; +} + +void TSchemeShard::OnBorrowedCompactionTimeout(const TShardIdx& shardIdx) { +    UpdateBorrowedCompactionQueueMetrics(); +    TabletCounters->Cumulative()[COUNTER_BORROWED_COMPACTION_TIMEOUT].Increment(1); + +    RunningBorrowedCompactions.erase(shardIdx); + +    auto ctx = TActivationContext::ActorContextFor(SelfId()); + +    auto it = ShardInfos.find(shardIdx); +    if (it == ShardInfos.end()) { +        LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info " +            "for timeout borrowed compaction# " << shardIdx +            << " at schemeshard# " << TabletID()); +        return; +    } + +    const auto& datashardId = it->second.TabletID; +    const auto& pathId = it->second.PathId; + +    LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Borrowed compaction timeout " +        "for pathId# " << pathId << ", datashard# " << datashardId +        << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() +        << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" +        << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" +        << " at schemeshard " << TabletID()); + +    // retry +    EnqueueBorrowedCompaction(shardIdx); +} + +void TSchemeShard::BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId) { +    auto tabletIt = TabletIdToShardIdx.find(tabletId); +    if (tabletIt == TabletIdToShardIdx.end()) +        return; // just sanity check +    const auto& shardIdx = tabletIt->second; + +    auto it = RunningBorrowedCompactions.find(shardIdx); +    if (it == RunningBorrowedCompactions.end()) +        return; + +    if (it->second != clientId) +        return; + +    RunningBorrowedCompactions.erase(it); + +    // disonnected from node we have requested borrowed compaction. We just resend request, because it +    // is safe: if first request is executing or has been already executed, then second request will be ignored. + +    StartBorrowedCompaction(shardIdx); +} + +void TSchemeShard::EnqueueBorrowedCompaction(const TShardIdx& shardIdx) { +    if (!BorrowedCompactionQueue) +        return; + +    auto ctx = TActivationContext::ActorContextFor(SelfId()); + +    LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, +        "borrowed compaction enqueue shard# " << shardIdx << " at schemeshard " << TabletID()); + +    BorrowedCompactionQueue->Enqueue(shardIdx); +    UpdateBorrowedCompactionQueueMetrics(); +} + +void TSchemeShard::RemoveBorrowedCompaction(const TShardIdx& shardIdx) { +    if (!BorrowedCompactionQueue) +        return; + +    RunningBorrowedCompactions.erase(shardIdx); +    BorrowedCompactionQueue->Remove(shardIdx); +    UpdateBorrowedCompactionQueueMetrics(); +} + +void TSchemeShard::UpdateBorrowedCompactionQueueMetrics() { +    if (!BorrowedCompactionQueue) +        return; + +    TabletCounters->Simple()[COUNTER_BORROWED_COMPACTION_QUEUE_SIZE].Set(BorrowedCompactionQueue->Size()); +    TabletCounters->Simple()[COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING].Set(BorrowedCompactionQueue->RunningSize()); +} + +void TSchemeShard::Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx) { +    const auto& record = ev->Get()->Record; + +    const TTabletId tabletId(record.GetTabletId()); +    const TShardIdx shardIdx = GetShardIdx(tabletId); + +    auto pathId = TPathId( +        record.GetPathId().GetOwnerId(), +        record.GetPathId().GetLocalId()); + +    auto duration = BorrowedCompactionQueue->OnDone(shardIdx); + +    if (shardIdx == InvalidShardIdx) { +        LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished borrowed compaction of unknown shard " +            "for pathId# " << pathId << ", datashard# " << tabletId +            << " in# " << duration.MilliSeconds() +            << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() +            << ", rate# " << BorrowedCompactionQueue->GetRate() +            << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" +            << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" +            << " at schemeshard " << TabletID()); +    } else { +        LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished borrowed compaction " +            "for pathId# " << pathId << ", datashard# " << tabletId +            << ", shardIdx# " << shardIdx +            << " in# " << duration.MilliSeconds() +            << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta() +            << ", rate# " << BorrowedCompactionQueue->GetRate() +            << ", in queue# " << BorrowedCompactionQueue->Size() << " shards" +            << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards" +            << " at schemeshard " << TabletID()); +    } + +    RunningBorrowedCompactions.erase(shardIdx); + +    TabletCounters->Cumulative()[COUNTER_BORROWED_COMPACTION_OK].Increment(1); +    UpdateBorrowedCompactionQueueMetrics(); +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index 913ed271153..810be23a9d4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -1,7 +1,6 @@  #include "schemeshard_impl.h" -namespace NKikimr { -namespace NSchemeShard { +namespace NKikimr::NSchemeShard {  NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardCompactionInfo& info) {      UpdateBackgroundCompactionQueueMetrics(); @@ -138,7 +137,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T      UpdateBackgroundCompactionQueueMetrics();  } -void TSchemeShard::EnqueueCompaction( +void TSchemeShard::EnqueueBackgroundCompaction(      const TShardIdx& shardIdx,      const TTableInfo::TPartitionStats& stats)  { @@ -173,7 +172,7 @@ void TSchemeShard::EnqueueCompaction(      UpdateBackgroundCompactionQueueMetrics();  } -void TSchemeShard::UpdateCompaction( +void TSchemeShard::UpdateBackgroundCompaction(      const TShardIdx& shardIdx,      const TTableInfo::TPartitionStats& newStats)  { @@ -186,7 +185,7 @@ void TSchemeShard::UpdateCompaction(          LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,              "background compaction update removed shard# " << shardIdx              << " with borrowed parts at schemeshard " << TabletID()); -        RemoveCompaction(shardIdx); +        RemoveBackgroundCompaction(shardIdx);          return;      } @@ -194,7 +193,7 @@ void TSchemeShard::UpdateCompaction(          LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,              "background compaction update removed shard# " << shardIdx              << " with loaned parts at schemeshard " << TabletID()); -        RemoveCompaction(shardIdx); +        RemoveBackgroundCompaction(shardIdx);          return;      } @@ -212,7 +211,7 @@ void TSchemeShard::UpdateCompaction(      UpdateBackgroundCompactionQueueMetrics();  } -void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) { +void TSchemeShard::RemoveBackgroundCompaction(const TShardIdx& shardIdx) {      if (!CompactionQueue)          return; @@ -221,11 +220,15 @@ void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) {  }  void TSchemeShard::ShardRemoved(const TShardIdx& shardIdx) { -    RemoveCompaction(shardIdx); +    RemoveBackgroundCompaction(shardIdx); +    RemoveBorrowedCompaction(shardIdx);      RemoveShardMetrics(shardIdx);  }  void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() { +    if (!CompactionQueue) +        return; +      TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE].Set(CompactionQueue->Size());      TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_RUNNING].Set(CompactionQueue->RunningSize());      TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_WAITING_REPEAT].Set(CompactionQueue->WaitingSize()); @@ -300,5 +303,4 @@ void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) {      PartitionMetricsMap.erase(it);  } -} // NSchemeShard -} // NKikimr +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp index 1945d3cfed0..1da7c610a40 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp @@ -380,7 +380,7 @@ void NTableState::UpdatePartitioningForCopyTable(TOperationId operationId, TTxSt      NIceDb::TNiceDb db(context.GetDB());      // Erase previous partitioning as we are going to generate new one -    context.SS->DeleteTablePartitioning(db, txState.TargetPathId, dstTableInfo); +    context.SS->PersistTablePartitioningDeletion(db, txState.TargetPathId, dstTableInfo);      // Remove old shardIdx info and old txShards      for (const auto& shard : txState.Shards) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp index 9a4bfc7daf7..055525640ff 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp @@ -290,10 +290,11 @@ public:          auto oldAggrStats = tableInfo->GetStats().Aggregated;          // Delete the whole old partitioning and persist the whole new partitionig as the indexes have changed -        context.SS->DeleteTablePartitioning(db, tableId, tableInfo); +        context.SS->PersistTablePartitioningDeletion(db, tableId, tableInfo);          context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning));          context.SS->PersistTablePartitioning(db, tableId, tableInfo);          context.SS->PersistTablePartitionStats(db, tableId, tableInfo); +          context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_ACTIVE_COUNT].Sub(allSrcShardIdxs.size());          context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_INACTIVE_COUNT].Add(allSrcShardIdxs.size()); diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index a464fc39f97..065836b85d1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -194,10 +194,14 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte      table->UpdateShardStats(shardIdx, newStats);      if (!table->IsBackup) { -        Self->UpdateCompaction(shardIdx, newStats); +        Self->UpdateBackgroundCompaction(shardIdx, newStats);          Self->UpdateShardMetrics(shardIdx, newStats);      } +    if (!newStats.HasBorrowedData) { +        Self->RemoveBorrowedCompaction(shardIdx); +    } +      NIceDb::TNiceDb db(txc.DB);      if (!table->IsBackup && !table->IsShardsStatsDetached()) { @@ -333,7 +337,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte      if (newStats.HasBorrowedData) {          // We don't want to split shards that have borrow parts          // We must ask them to compact first -        CompactEv.Reset(new TEvDataShard::TEvCompactBorrowed(tableId)); +        Self->EnqueueBorrowedCompaction(shardIdx);          return true;      } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index a117a6690ac..9e8deb75f36 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -90,7 +90,7 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,      ScheduleCleanDroppedPaths();      ScheduleCleanDroppedSubDomains(); -    StartStopCompactionQueue(); +    StartStopCompactionQueues();      ctx.Send(TxAllocatorClient, MakeHolder<TEvTxAllocatorClient::TEvAllocate>(InitiateCachedTxIdsCount)); @@ -285,10 +285,17 @@ TMessageSeqNo TSchemeShard::NextRound() {  void TSchemeShard::Clear() {      PathsById.clear();      Tables.clear(); +      if (CompactionQueue) {          CompactionQueue->Clear();          UpdateBackgroundCompactionQueueMetrics();      } + +    if (BorrowedCompactionQueue) { +        BorrowedCompactionQueue->Clear(); +        UpdateBorrowedCompactionQueueMetrics(); +    } +      ShardsWithBorrowed.clear();      ShardsWithLoaned.clear();      PersQueueGroups.clear(); @@ -2078,7 +2085,7 @@ void TSchemeShard::PersistTablePartitioning(NIceDb::TNiceDb& db, const TPathId p      }  } -void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) { +void TSchemeShard::PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {      const auto& partitions = tableInfo->GetPartitions();      for (ui64 pi = 0; pi < partitions.size(); ++pi) {          if (IsLocalId(pathId)) { @@ -2086,8 +2093,6 @@ void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pa          }          db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete();          db.Table<Schema::TablePartitionStats>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete(); - -        ShardRemoved(partitions[pi].ShardIdx);      }  } @@ -3679,6 +3684,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info)          new TPipeClientFactory(this)))      , PipeTracker(*PipeClientCache)      , CompactionStarter(this) +    , BorrowedCompactionStarter(this)      , ShardDeleter(info->TabletID)      , AllowDataColumnForIndexTable(0, 0, 1)      , EnableAsyncIndexes(0, 0, 1) @@ -3753,6 +3759,9 @@ void TSchemeShard::Die(const TActorContext &ctx) {      if (CompactionQueue)          CompactionQueue->Shutdown(ctx); +    if (BorrowedCompactionQueue) +        BorrowedCompactionQueue->Shutdown(ctx); +      return IActor::Die(ctx);  } @@ -3788,10 +3797,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {      EnableBackgroundCompaction = appData->FeatureFlags.GetEnableBackgroundCompaction();      EnableBackgroundCompactionServerless = appData->FeatureFlags.GetEnableBackgroundCompactionServerless(); -    if (!CompactionQueue) -        ConfigureCompactionQueue(appData->CompactionConfig.GetBackgroundCompactionConfig(), ctx); -    ctx.RegisterWithSameMailbox(CompactionQueue); +    ConfigureCompactionQueues(appData->CompactionConfig, ctx);      if (appData->ChannelProfiles) {          ChannelProfiles = appData->ChannelProfiles; @@ -3969,6 +3976,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {          HFuncTraced(TEvSchemeShard::TEvMigrateSchemeShardResult, Handle);          HFuncTraced(TEvDataShard::TEvMigrateSchemeShardResponse, Handle);          HFuncTraced(TEvDataShard::TEvCompactTableResult, Handle); +        HFuncTraced(TEvDataShard::TEvCompactBorrowedResult, Handle);          HFuncTraced(TEvSchemeShard::TEvSyncTenantSchemeShard, Handle);          HFuncTraced(TEvSchemeShard::TEvUpdateTenantSchemeShard, Handle); @@ -4609,6 +4617,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc                     << ", to tablet: " << tabletId                     << ", at schemeshard: " << TabletID()); +    BorrowedCompactionHandleDisconnect(tabletId, clientId);      RestartPipeTx(tabletId, ctx);  } @@ -4646,6 +4655,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc          return;      } +    BorrowedCompactionHandleDisconnect(tabletId, clientId);      RestartPipeTx(tabletId, ctx);  } @@ -5822,14 +5832,32 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T      }      if (!tableInfo->IsBackup) { +        // partitions updated: +        // 1. We need to remove some parts from compaction queues. +        // 2. We need to add new ones to the queues. Since we can safely enqueue already +        // enqueued parts, we simple enqueue each part in newPartitioning +        THashSet<TShardIdx> newPartitioningSet; +        newPartitioningSet.reserve(newPartitioning.size()); +        const auto& oldPartitioning = tableInfo->GetPartitions(); +          for (const auto& p: newPartitioning) { +            if (!oldPartitioning.empty()) +                newPartitioningSet.insert(p.ShardIdx); +              const auto& partitionStats = tableInfo->GetStats().PartitionStats;              auto it = partitionStats.find(p.ShardIdx);              if (it != partitionStats.end()) { -                EnqueueCompaction(p.ShardIdx, it->second); +                EnqueueBackgroundCompaction(p.ShardIdx, it->second);                  UpdateShardMetrics(p.ShardIdx, it->second);              }          } + +        for (const auto& p: oldPartitioning) { +            if (!newPartitioningSet.contains(p.ShardIdx)) { +                // note that queues might not contain the shard +                ShardRemoved(p.ShardIdx); +            } +        }      }      tableInfo->SetPartitioning(std::move(newPartitioning)); @@ -5980,13 +6008,11 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi      if (appConfig.HasCompactionConfig()) {          const auto& compactionConfig = appConfig.GetCompactionConfig(); -        if (compactionConfig.HasBackgroundCompactionConfig()) { -            ConfigureCompactionQueue(compactionConfig.GetBackgroundCompactionConfig(), ctx); -        } +        ConfigureCompactionQueues(compactionConfig, ctx);      }      if (IsShemeShardConfigured()) { -        StartStopCompactionQueue(); +        StartStopCompactionQueues();      }  } @@ -6004,7 +6030,24 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu      EnableBackgroundCompactionServerless = featureFlags.GetEnableBackgroundCompactionServerless();  } -void TSchemeShard::ConfigureCompactionQueue( +void TSchemeShard::ConfigureCompactionQueues( +    const NKikimrConfig::TCompactionConfig& compactionConfig, +    const TActorContext &ctx) +{ +    if (compactionConfig.HasBackgroundCompactionConfig()) { +        ConfigureBackgroundCompactionQueue(compactionConfig.GetBackgroundCompactionConfig(), ctx); +    } else { +        ConfigureBackgroundCompactionQueue(NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig(), ctx); +    } + +    if (compactionConfig.HasBorrowedCompactionConfig()) { +        ConfigureBorrowedCompactionQueue(compactionConfig.GetBorrowedCompactionConfig(), ctx); +    } else { +        ConfigureBorrowedCompactionQueue(NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig(), ctx); +    } +} + +void TSchemeShard::ConfigureBackgroundCompactionQueue(      const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config,      const TActorContext &ctx)  { @@ -6035,10 +6078,11 @@ void TSchemeShard::ConfigureCompactionQueue(              compactionConfig,              queueConfig,              CompactionStarter); +        ctx.RegisterWithSameMailbox(CompactionQueue);      }      LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, -                 "CompactionQueue configured: Timeout# " << compactionConfig.Timeout +                 "BackgroundCompactionQueue configured: Timeout# " << compactionConfig.Timeout                   << ", compact single parted# " << (queueConfig.CompactSinglePartedShards ? "yes" : "no")                   << ", Rate# " << CompactionQueue->GetRate()                   << ", WakeupInterval# " << compactionConfig.WakeupInterval @@ -6048,7 +6092,34 @@ void TSchemeShard::ConfigureCompactionQueue(                   << ", MaxRate# " << compactionConfig.MaxRate);  } -void TSchemeShard::StartStopCompactionQueue() { +void TSchemeShard::ConfigureBorrowedCompactionQueue( +    const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config, +    const TActorContext &ctx) +{ +    TBorrowedCompactionQueue::TConfig compactionConfig; + +    compactionConfig.IsCircular = false; +    compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds()); +    compactionConfig.InflightLimit = config.GetInflightLimit(); +    compactionConfig.MaxRate = config.GetMaxRate(); + +    if (BorrowedCompactionQueue) { +        BorrowedCompactionQueue->UpdateConfig(compactionConfig); +    } else { +        BorrowedCompactionQueue = new TBorrowedCompactionQueue( +            compactionConfig, +            BorrowedCompactionStarter); +        ctx.RegisterWithSameMailbox(BorrowedCompactionQueue); +    } + +    LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, +                 "BorrowedCompactionQueue configured: Timeout# " << compactionConfig.Timeout +                 << ", Rate# " << BorrowedCompactionQueue->GetRate() +                 << ", WakeupInterval# " << compactionConfig.WakeupInterval +                 << ", InflightLimit# " << compactionConfig.InflightLimit); +} + +void TSchemeShard::StartStopCompactionQueues() {      // note, that we don't need to check current state of compaction queue      if (IsServerlessDomain(TPath::Init(RootPathId(), this))) {          if (EnableBackgroundCompactionServerless) { @@ -6063,6 +6134,8 @@ void TSchemeShard::StartStopCompactionQueue() {              CompactionQueue->Stop();          }      } + +    BorrowedCompactionQueue->Start();  }  void TSchemeShard::Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr &, const TActorContext &ctx) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 0eba6b36594..40aba655404 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -99,6 +99,30 @@ private:          TSchemeShard* Self;      }; +    using TBorrowedCompactionQueue = NOperationQueue::TOperationQueueWithTimer< +        TShardIdx, +        TFifoQueue<TShardIdx>, +        TEvPrivate::EvRunBorrowedCompaction, +        NKikimrServices::FLAT_TX_SCHEMESHARD>; + +    class TBorrowedCompactionStarter : public TBorrowedCompactionQueue::IStarter { +    public: +        TBorrowedCompactionStarter(TSchemeShard* self) +            : Self(self) +        { } + +        NOperationQueue::EStartStatus StartOperation(const TShardIdx& shardIdx) override { +            return Self->StartBorrowedCompaction(shardIdx); +        } + +        void OnTimeout(const TShardIdx& shardIdx) override { +            Self->OnBorrowedCompactionTimeout(shardIdx); +        } + +    private: +        TSchemeShard* Self; +    }; +  public:      static constexpr ui32 DefaultPQTabletPartitionsCount = 1;      static constexpr ui32 MaxPQTabletPartitionsCount = 1000; @@ -207,6 +231,13 @@ public:      TCompactionStarter CompactionStarter;      TCompactionQueue* CompactionQueue = nullptr; + +    TBorrowedCompactionStarter BorrowedCompactionStarter; +    TBorrowedCompactionQueue* BorrowedCompactionQueue = nullptr; + +    // shardIdx -> clientId +    THashMap<TShardIdx, TActorId> RunningBorrowedCompactions; +      THashSet<TShardIdx> ShardsWithBorrowed; // shards have parts from another shards      THashSet<TShardIdx> ShardsWithLoaned;   // shards have parts loaned to another shards      bool EnableBackgroundCompaction = false; @@ -298,10 +329,19 @@ public:      void ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfig, const TActorContext& ctx);      void ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featureFlags, const TActorContext& ctx); -    void ConfigureCompactionQueue( +    void ConfigureCompactionQueues( +        const NKikimrConfig::TCompactionConfig& config, +        const TActorContext &ctx); + +    void ConfigureBackgroundCompactionQueue(          const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config,          const TActorContext &ctx); -    void StartStopCompactionQueue(); + +    void ConfigureBorrowedCompactionQueue( +        const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config, +        const TActorContext &ctx); + +    void StartStopCompactionQueues();      bool ApplyStorageConfig(const TStoragePools& storagePools,                              const NKikimrSchemeOp::TStorageConfig& storageConfig, @@ -464,7 +504,7 @@ public:      void PersistTable(NIceDb::TNiceDb &db, const TPathId pathId);      void PersistChannelsBinding(NIceDb::TNiceDb& db, const TShardIdx shardId, const TChannelsBindings& bindedChannels);      void PersistTablePartitioning(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo); -    void DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo); +    void PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo);      void PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo);      void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats);      void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TShardIdx& shardIdx, const TTableInfo::TPtr tableInfo); @@ -640,9 +680,12 @@ public:      void ScheduleCleanDroppedPaths();      void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx); -    void EnqueueCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); -    void UpdateCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); -    void RemoveCompaction(const TShardIdx& shardIdx); +    void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); +    void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); +    void RemoveBackgroundCompaction(const TShardIdx& shardIdx); + +    void EnqueueBorrowedCompaction(const TShardIdx& shardIdx); +    void RemoveBorrowedCompaction(const TShardIdx& shardIdx);      void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats);      void RemoveShardMetrics(const TShardIdx& shardIdx); @@ -653,6 +696,11 @@ public:      void OnBackgroundCompactionTimeout(const TShardCompactionInfo& info);      void UpdateBackgroundCompactionQueueMetrics(); +    NOperationQueue::EStartStatus StartBorrowedCompaction(const TShardIdx& shardIdx); +    void OnBorrowedCompactionTimeout(const TShardIdx& shardIdx); +    void BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId); +    void UpdateBorrowedCompactionQueueMetrics(); +      struct TTxCleanDroppedSubDomains;      NTabletFlatExecutor::ITransaction* CreateTxCleanDroppedSubDomains(); @@ -817,6 +865,7 @@ public:      void Handle(TEvSchemeShard::TEvMigrateSchemeShardResult::TPtr& ev, const TActorContext& ctx);      void Handle(TEvDataShard::TEvMigrateSchemeShardResponse::TPtr& ev, const TActorContext& ctx);      void Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx); +    void Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx);      void Handle(TEvSchemeShard::TEvSyncTenantSchemeShard::TPtr& ev, const TActorContext& ctx);      void Handle(TEvSchemeShard::TEvUpdateTenantSchemeShard::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 207fd8d2c77..751de0231ec 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -21,6 +21,7 @@ struct TEvPrivate {          EvSubscribeToShardDeletion,          EvNotifyShardDeleted,          EvRunBackgroundCompaction, +        EvRunBorrowedCompaction,          EvCompletePublication,          EvCompleteBarrier,          EvEnd diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index b5e1fac3fe7..4da2c1e22b1 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -73,25 +73,13 @@ TPathInfo GetPathInfo(      return info;  } -void CreateTableWithData( +void WriteData(      TTestActorRuntime &runtime, -    TTestEnv& env, -    const char* path,      const char* name, -    ui32 shardsCount, -    ui64& txId, -    ui64 schemeshardId = TTestTxConfig::SchemeShard) +    ui64 fromKeyInclusive, +    ui64 toKey, +    ui64 tabletId = TTestTxConfig::FakeHiveTablets)  { -    TestCreateTable(runtime, schemeshardId, ++txId, path, -        Sprintf(R"____( -            Name: "%s" -            Columns { Name: "key"  Type: "Uint64"} -            Columns { Name: "value" Type: "Utf8"} -            KeyColumnNames: ["key"] -            UniformPartitionsCount: %d -        )____", name, shardsCount)); -    env.TestWaitNotification(runtime, txId, schemeshardId); -      auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) {          TString writeQuery = Sprintf(R"(              ( @@ -107,21 +95,56 @@ void CreateTableWithData(          UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;      }; -    for (ui64 key = 0; key < 100; ++key) { -        fnWriteRow(TTestTxConfig::FakeHiveTablets, key, name); +    for (ui64 key = fromKeyInclusive; key < toKey; ++key) { +        fnWriteRow(tabletId, key, name);      }  } -void SetFeatures( +void CreateTableWithData( +    TTestActorRuntime &runtime, +    TTestEnv& env, +    const char* path, +    const char* name, +    ui32 shardsCount, +    ui64& txId, +    ui64 schemeshardId = TTestTxConfig::SchemeShard) +{ +    TestCreateTable(runtime, schemeshardId, ++txId, path, +        Sprintf(R"____( +            Name: "%s" +            Columns { Name: "key"  Type: "Uint64"} +            Columns { Name: "value" Type: "Utf8"} +            KeyColumnNames: ["key"] +            UniformPartitionsCount: %d +            PartitionConfig { +                PartitioningPolicy { +                    MinPartitionsCount: %d +                    MaxPartitionsCount: %d +                } +            } +        )____", name, shardsCount, shardsCount, shardsCount)); +    env.TestWaitNotification(runtime, txId, schemeshardId); + +    WriteData(runtime, name, 0, 100); +} + +void SetConfig(      TTestActorRuntime &runtime, -    TTestEnv&,      ui64 schemeShard, -    const NKikimrConfig::TFeatureFlags& features) +    THolder<NConsole::TEvConsole::TEvConfigNotificationRequest> request)  { +    auto sender = runtime.AllocateEdgeActor(); + +    runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); + +    TAutoPtr<IEventHandle> handle; +    runtime.GrabEdgeEventRethrow<NConsole::TEvConsole::TEvConfigNotificationResponse>(handle); +} + +THolder<NConsole::TEvConsole::TEvConfigNotificationRequest> GetTestCompactionConfig() {      auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>(); -    *request->Record.MutableConfig()->MutableFeatureFlags() = features; -    // little hack to simplify life +    // little hacks to simplify life      auto* compactionConfig = request->Record.MutableConfig()->MutableCompactionConfig();      compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);      compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0); @@ -134,12 +157,20 @@ void SetFeatures(      compactionConfig->MutableBackgroundCompactionConfig()->SetMaxRate(1);      compactionConfig->MutableBackgroundCompactionConfig()->SetRoundSeconds(0); -    auto sender = runtime.AllocateEdgeActor(); +    compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); -    runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries()); +    return request; +} -    TAutoPtr<IEventHandle> handle; -    runtime.GrabEdgeEventRethrow<NConsole::TEvConsole::TEvConfigNotificationResponse>(handle); +void SetFeatures( +    TTestActorRuntime &runtime, +    TTestEnv&, +    ui64 schemeShard, +    const NKikimrConfig::TFeatureFlags& features) +{ +    auto request = GetTestCompactionConfig(); +    *request->Record.MutableConfig()->MutableFeatureFlags() = features; +    SetConfig(runtime, schemeShard, std::move(request));  }  void SetBackgroundCompactionServerless(TTestActorRuntime &runtime, TTestEnv& env, ui64 schemeShard, bool value) { @@ -176,6 +207,8 @@ void DisableBackgroundCompactionViaRestart(      compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1);      compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0); +    compactionConfig.MutableBorrowedCompactionConfig()->SetInflightLimit(1); +      TActorId sender = runtime.AllocateEdgeActor();      RebootTablet(runtime, schemeShard, sender);  } @@ -203,6 +236,8 @@ void EnableBackgroundCompactionViaRestart(      compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1);      compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0); +    compactionConfig.MutableBorrowedCompactionConfig()->SetInflightLimit(1); +      TActorId sender = runtime.AllocateEdgeActor();      RebootTablet(runtime, schemeShard, sender);  } @@ -210,17 +245,20 @@ void EnableBackgroundCompactionViaRestart(  struct TCompactionStats {      ui64 BackgroundRequestCount = 0;      ui64 BackgroundCompactionCount = 0; +    ui64 CompactBorrowedCount = 0;      TCompactionStats() = default;      TCompactionStats(const NKikimrTxDataShard::TEvGetCompactTableStatsResult& stats)          : BackgroundRequestCount(stats.GetBackgroundCompactionRequests())          , BackgroundCompactionCount(stats.GetBackgroundCompactionCount()) +        , CompactBorrowedCount(stats.GetCompactBorrowedCount())      {}      void Update(const TCompactionStats& other) {          BackgroundRequestCount += other.BackgroundRequestCount;          BackgroundCompactionCount += other.BackgroundCompactionCount; +        CompactBorrowedCount += other.CompactBorrowedCount;      }  }; @@ -276,12 +314,41 @@ TCompactionStats GetCompactionStats(          info.OwnerId);  } -void CheckShardCompacted( +void CheckShardBorrowedCompacted(      TTestActorRuntime &runtime,      const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,      ui64 tabletId, -    ui64 ownerId, -    bool shouldCompacted = true) +    ui64 ownerId) +{ +    auto count = GetCompactionStats( +        runtime, +        userTable, +        tabletId, +        ownerId).CompactBorrowedCount; + +    UNIT_ASSERT(count > 0); +} + +void CheckShardNotBorrowedCompacted( +    TTestActorRuntime &runtime, +    const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, +    ui64 tabletId, +    ui64 ownerId) +{ +    auto count = GetCompactionStats( +        runtime, +        userTable, +        tabletId, +        ownerId).CompactBorrowedCount; + +    UNIT_ASSERT_VALUES_EQUAL(count, 0UL); +} + +void CheckShardBackgroundCompacted( +    TTestActorRuntime &runtime, +    const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, +    ui64 tabletId, +    ui64 ownerId)  {      auto count = GetCompactionStats(          runtime, @@ -289,14 +356,25 @@ void CheckShardCompacted(          tabletId,          ownerId).BackgroundRequestCount; -    if (shouldCompacted) { -        UNIT_ASSERT(count > 0); -    } else { -        UNIT_ASSERT_VALUES_EQUAL(count, 0UL); -    } +    UNIT_ASSERT(count > 0); +} + +void CheckShardNotBackgroundCompacted( +    TTestActorRuntime &runtime, +    const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable, +    ui64 tabletId, +    ui64 ownerId) +{ +    auto count = GetCompactionStats( +        runtime, +        userTable, +        tabletId, +        ownerId).BackgroundRequestCount; + +    UNIT_ASSERT_VALUES_EQUAL(count, 0UL);  } -void CheckNoCompactionsInPeriod( +void CheckNoBackgroundCompactionsInPeriod(      TTestActorRuntime &runtime,      TTestEnv& env,      const TString& path, @@ -338,8 +416,10 @@ void TestBackgroundCompaction(      enableBackgroundCompactionFunc(runtime, env);      env.SimulateSleep(runtime, TDuration::Seconds(30)); -    for (auto shard: info.Shards) -        CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId); +    for (auto shard: info.Shards) { +        CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); +        CheckShardNotBorrowedCompacted(runtime, info.UserTable, shard, info.OwnerId); +    }  }  ui64 TestServerless( @@ -430,8 +510,14 @@ ui64 TestServerless(      env.SimulateSleep(runtime, TDuration::Seconds(30)); -    for (auto shard: info.Shards) -        CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId, enableServerless); +    for (auto shard: info.Shards) { +        if (enableServerless) +            CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); +        else +            CheckShardNotBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); + +        CheckShardNotBorrowedCompacted(runtime, info.UserTable, shard, info.OwnerId); +    }      return schemeshardId;  } @@ -494,7 +580,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {          // some time to finish compactions in progress          env.SimulateSleep(runtime, TDuration::Seconds(30)); -        CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); +        CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/Simple");      }      Y_UNIT_TEST(ShouldNotCompactServerless) { @@ -536,7 +622,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {          // some time to finish compactions in progress          env.SimulateSleep(runtime, TDuration::Seconds(30)); -        CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId); +        CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId);      }      Y_UNIT_TEST(SchemeshardShouldNotCompactBackups) { @@ -565,7 +651,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {          SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); -        CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); +        CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");          UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL);      } @@ -611,11 +697,11 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {          SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); -        CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable"); +        CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");          UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL);          // original table should not be compacted as well -        CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple"); +        CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/Simple");      }      Y_UNIT_TEST(SchemeshardShouldHandleCompactionTimeouts) { @@ -652,14 +738,345 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {          // note that we create 1-sharded table to avoid complications          CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); -        env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); -        UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 1UL); +        while (compactionResultCount < 3UL) +            env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); +    } +}; + +Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { +    Y_UNIT_TEST(SchemeshardShouldCompactBorrowed) { +        TTestBasicRuntime runtime; +        TTestEnv env(runtime); + +        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); +        runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + +        // in case it is not enabled by default +        SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + +        auto configRequest = GetTestCompactionConfig(); +        auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); +        compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + +        SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + +        ui64 txId = 1000; + +        CreateTableWithData(runtime, env, "/MyRoot", "Simple", 5, txId); + +        { +            // write to all shards in hacky way +            auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); +            for (auto shard: simpleInfo.Shards) { +                WriteData(runtime, "Simple", 0, 100, shard); +            } +        } +        env.SimulateSleep(runtime, TDuration::Seconds(1)); + +        // copy table +        TestCreateTable(runtime, ++txId, "/MyRoot", R"( +            Name: "CopyTable" +            CopyFromTable: "/MyRoot/Simple" +        )"); +        env.TestWaitNotification(runtime, txId); + +        env.SimulateSleep(runtime, TDuration::Seconds(30)); + +        auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); +        auto copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable"); + +        // borrow compaction only runs when we split, so nothing should be borrow compacted yet + +        { +            for (auto shard: simpleInfo.Shards) { +                CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); +            } +        } + +        { +            for (auto shard: copyInfo.Shards) { +                CheckShardNotBorrowedCompacted(runtime, copyInfo.UserTable, shard, copyInfo.OwnerId); +            } +        } + +        // now force split + +        TestAlterTable(runtime, ++txId, "/MyRoot", R"( +                        Name: "CopyTable" +                        PartitionConfig { +                            PartitioningPolicy { +                                MinPartitionsCount: 20 +                                MaxPartitionsCount: 20 +                                SizeToSplit: 1 +                            } +                        })"); +        env.TestWaitNotification(runtime, txId); + +        // schemeshard should get stats from DS to start borrower compactions +        env.SimulateSleep(runtime, TDuration::Seconds(30)); + +        // should compact all borrowed data (note that background will not compact until then) + +        { +            for (auto shard: copyInfo.Shards) { +                CheckShardBorrowedCompacted(runtime, copyInfo.UserTable, shard, copyInfo.OwnerId); +            } +        } + +        { +            // Simple again the only owner +            for (auto shard: simpleInfo.Shards) { +                CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); +            } +        } + +        // now should be no borrower compactions, but background should do the job + +        auto copyCount1 = GetCompactionStats(runtime, "/MyRoot/CopyTable").CompactBorrowedCount; +        auto simpleCount1 = GetCompactionStats(runtime, "/MyRoot/Simple").CompactBorrowedCount; +        env.SimulateSleep(runtime, TDuration::Seconds(30)); + +        { +            auto info = GetPathInfo(runtime, "/MyRoot/Simple"); +            for (auto shard: info.Shards) { +                CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); +            } +            auto simpleCount2 = GetCompactionStats(runtime, "/MyRoot/Simple").CompactBorrowedCount; +            UNIT_ASSERT_VALUES_EQUAL(simpleCount1, simpleCount2); +        } + +        { +            auto info = GetPathInfo(runtime, "/MyRoot/CopyTable"); +            for (auto shard: info.Shards) { +                CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId); +            } +            auto copyCount2 = GetCompactionStats(runtime, "/MyRoot/CopyTable").CompactBorrowedCount; +            UNIT_ASSERT_VALUES_EQUAL(copyCount1, copyCount2); +        } +    } + +    Y_UNIT_TEST(SchemeshardShouldHandleBorrowCompactionTimeouts) { +        TTestBasicRuntime runtime; +        TTestEnv env(runtime); + +        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); +        runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + +        auto configRequest = GetTestCompactionConfig(); +        auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); +        compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); +        compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(3); + +        SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + +        size_t borrowedRequests = 0; + +        // capture original observer func by setting dummy one +        auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { +            return TTestActorRuntime::EEventAction::PROCESS; +        }); +        // now set our observer backed up by original +        runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { +            switch (ev->GetTypeRewrite()) { +            case TEvDataShard::EvCompactBorrowed: { +                Y_UNUSED(ev.Release()); +                ++borrowedRequests; +                return TTestActorRuntime::EEventAction::DROP; +            } +            default: +                return originalObserver(runtime, ev); +            } +        }); + +        ui64 txId = 1000; + +        // note that we create 1-sharded table to avoid complications +        CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + +        // copy table +        TestCreateTable(runtime, ++txId, "/MyRoot", R"( +            Name: "CopyTable" +            CopyFromTable: "/MyRoot/Simple" +        )"); +        env.TestWaitNotification(runtime, txId); + +        // now force split +        TestAlterTable(runtime, ++txId, "/MyRoot", R"( +                        Name: "CopyTable" +                        PartitionConfig { +                            PartitioningPolicy { +                                MinPartitionsCount: 2 +                                MaxPartitionsCount: 2 +                                SizeToSplit: 1 +                            } +                        })"); +        env.TestWaitNotification(runtime, txId); + +        // wait until DS reports that it has borrowed data +        while (borrowedRequests < 1) { +            env.SimulateSleep(runtime, TDuration::Seconds(1)); +        } + +        env.SimulateSleep(runtime, TDuration::Seconds(60)); + +        while (borrowedRequests < 3) +            env.SimulateSleep(runtime, TDuration::Seconds(10)); +    } + +    Y_UNIT_TEST(SchemeshardShouldHandleDataShardReboot) { +        TTestBasicRuntime runtime; +        TTestEnv env(runtime); + +        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); +        runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + +        auto configRequest = GetTestCompactionConfig(); +        auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); +        compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); +        compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(1000); // avoid timeouts + +        // now we have 1 inflight which will hang the queue in case on long timeout +        SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); -        env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); -        UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 2UL); +        size_t borrowedRequests = 0; + +        // capture original observer func by setting dummy one +        auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { +            return TTestActorRuntime::EEventAction::PROCESS; +        }); +        // now set our observer backed up by original +        runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { +            switch (ev->GetTypeRewrite()) { +            case TEvDataShard::EvCompactBorrowed: { +                Y_UNUSED(ev.Release()); +                ++borrowedRequests; +                return TTestActorRuntime::EEventAction::DROP; +            } +            default: +                return originalObserver(runtime, ev); +            } +        }); +        ui64 txId = 1000; + +        // note that we create 1-sharded table to avoid complications +        CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + +        // copy table +        TestCreateTable(runtime, ++txId, "/MyRoot", R"( +            Name: "CopyTable" +            CopyFromTable: "/MyRoot/Simple" +        )"); +        env.TestWaitNotification(runtime, txId); + +        // now force split +        TestAlterTable(runtime, ++txId, "/MyRoot", R"( +                        Name: "CopyTable" +                        PartitionConfig { +                            PartitioningPolicy { +                                MinPartitionsCount: 2 +                                MaxPartitionsCount: 2 +                                SizeToSplit: 1 +                            } +                        })"); +        env.TestWaitNotification(runtime, txId); + +        // wait until DS reports that it has borrowed data +        while (borrowedRequests < 1) { +            env.SimulateSleep(runtime, TDuration::Seconds(1)); +        } + +        env.SimulateSleep(runtime, TDuration::Seconds(1)); +        UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 1UL); + +        env.SimulateSleep(runtime, TDuration::Seconds(1)); +        UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 1UL); + +        auto info = GetPathInfo(runtime, "/MyRoot/CopyTable"); +        UNIT_ASSERT_VALUES_EQUAL(info.Shards.size(), 1UL); + +        // break the pipes and check that SS requested compaction again +        TActorId sender = runtime.AllocateEdgeActor(); +        RebootTablet(runtime, info.Shards[0], sender); +        env.SimulateSleep(runtime, TDuration::Seconds(1)); +        UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 2UL); + +        // one more time +        RebootTablet(runtime, info.Shards[0], sender); +        env.SimulateSleep(runtime, TDuration::Seconds(1)); +        UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 3UL); +    } + +    Y_UNIT_TEST(SchemeshardShouldNotCompactAfterDrop) { +        TTestBasicRuntime runtime; +        TTestEnv env(runtime); + +        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); +        runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + +        auto configRequest = GetTestCompactionConfig(); +        auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); +        compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); +        compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(5); + +        // now we have 1 inflight which will hang the queue in case on long timeout +        SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + +        size_t borrowedRequests = 0; + +        // capture original observer func by setting dummy one +        auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) { +            return TTestActorRuntime::EEventAction::PROCESS; +        }); +        // now set our observer backed up by original +        runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) { +            switch (ev->GetTypeRewrite()) { +            case TEvDataShard::EvCompactBorrowed: { +                Y_UNUSED(ev.Release()); +                ++borrowedRequests; +                return TTestActorRuntime::EEventAction::DROP; +            } +            default: +                return originalObserver(runtime, ev); +            } +        }); +        ui64 txId = 1000; + +        // note that we create 1-sharded table to avoid complications +        CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + +        // copy table +        TestCreateTable(runtime, ++txId, "/MyRoot", R"( +            Name: "CopyTable" +            CopyFromTable: "/MyRoot/Simple" +        )"); +        env.TestWaitNotification(runtime, txId); + +        // now force split +        TestAlterTable(runtime, ++txId, "/MyRoot", R"( +                        Name: "CopyTable" +                        PartitionConfig { +                            PartitioningPolicy { +                                MinPartitionsCount: 2 +                                MaxPartitionsCount: 2 +                                SizeToSplit: 1 +                            } +                        })"); +        env.TestWaitNotification(runtime, txId); + +        // wait until DS reports that it has borrowed data +        while (borrowedRequests < 1) { +            env.SimulateSleep(runtime, TDuration::MilliSeconds(100)); +        } + +        auto requestsBefore = borrowedRequests; + +        // SS waits reply from DS, drop the table meanwhile +        TestDropTable(runtime, ++txId, "/MyRoot", "CopyTable"); +        env.TestWaitNotification(runtime, txId); +        env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets + 1); -        env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1)); -        UNIT_ASSERT_VALUES_EQUAL(compactionResultCount, 3UL); +        env.SimulateSleep(runtime, TDuration::Seconds(10)); // 2x timeout +        UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, requestsBefore);      }  }; diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h index 514574d160c..61bed56bc30 100644 --- a/ydb/core/util/operation_queue.h +++ b/ydb/core/util/operation_queue.h @@ -555,10 +555,19 @@ void TOperationQueue<T, TQueue>::CheckTimeoutOperations() {      while (!RunningItems.Empty()) {          const auto& item = RunningItems.Front();          if (item.Timestamp + Config.Timeout <= now) { -            Starter.OnTimeout(item.Item); -            if (Config.IsCircular) -                ReEnqueueNoStart(std::move(item.Item)); +            auto movedItem = std::move(item.Item); + +            // we want to pop before calling OnTimeout, because +            // it might want to enqueue in case of non-circular +            // queue              RunningItems.PopFront(); + +            // note that OnTimeout() can enqueue item back +            // in case of non-circular queue +            Starter.OnTimeout(movedItem); + +            if (Config.IsCircular) +                ReEnqueueNoStart(std::move(movedItem));              continue;          }          break;  | 
