aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <eivanov89@yandex-team.ru>2022-06-06 16:47:10 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 16:47:10 +0300
commit31846aab71130c74baf2a34115726954b1ff26df (patch)
tree07fc8b91c5ed880df145e541f52b8ec2f0870bf9
parentfa8ff7b07c7ef2cac601269486b51a1b720dcd02 (diff)
downloadydb-31846aab71130c74baf2a34115726954b1ff26df.tar.gz
PR from branch users/eivanov89/compaction_fixes_22-2
KIKIMR-14017: implement queue to compact borrowed data REVIEW: 2550334 KIKIMR-14761: fix bug to always wakeup compaction queue REVIEW: 2560061 REVIEW: 2600932 x-ydb-stable-ref: 29443775e0b4b67d4a92285ff83902639665a1ae
-rw-r--r--ydb/core/protos/config.proto9
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/protos/counters_schemeshard.proto6
-rw-r--r--ydb/core/protos/tx_datashard.proto6
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp5
-rw-r--r--ydb/core/tablet_flat/flat_database.h2
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp5
-rw-r--r--ydb/core/tablet_flat/flat_executor.h2
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp27
-rw-r--r--ydb/core/tablet_flat/flat_table.h2
-rw-r--r--ydb/core/tablet_flat/flat_table_stats.h1
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h2
-rw-r--r--ydb/core/tx/datashard/datashard.h42
-rw-r--r--ydb/core/tx/datashard/datashard__compact_borrowed.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard__compaction.cpp67
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h15
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h1
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h21
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp161
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp30
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp103
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h61
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_private.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp562
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
-rw-r--r--ydb/core/util/operation_queue.h51
-rw-r--r--ydb/core/util/operation_queue_priority_ut.cpp17
-rw-r--r--ydb/core/util/operation_queue_ut.cpp53
-rw-r--r--ydb/core/util/token_bucket.h15
-rw-r--r--ydb/core/util/ya.make1
33 files changed, 1110 insertions, 198 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8bc714d0d1..167348a284 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1440,7 +1440,16 @@ message TCompactionConfig {
optional bool CompactSinglePartedShards = 10 [default = false];
}
+ message TBorrowedCompactionConfig {
+ optional double MaxRate = 1 [default = 0]; // unlimitted
+ optional uint64 InflightLimit = 2 [default = 10]; // TODO: consider more?
+
+ // After this interval we will try to restart
+ optional uint64 TimeoutSeconds = 3 [default = 15];
+ }
+
optional TBackgroundCompactionConfig BackgroundCompactionConfig = 1;
+ optional TBorrowedCompactionConfig BorrowedCompactionConfig = 2;
}
// This message is used to upload custom service configs
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index bd0534b651..d68c7d1387 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -110,6 +110,7 @@ enum ECumulativeCounters {
COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84 [(CounterOpts) = {Name: "TxCompactTableFailedStart"}];
COUNTER_FULL_COMPACTION_DONE = 85 [(CounterOpts) = {Name: "FullCompactionCount"}];
COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED = 86 [(CounterOpts) = {Name: "TxCompactTableFailedLoaned"}];
+ COUNTER_TX_COMPACT_BORROWED = 87 [(CounterOpts) = {Name: "TxCompactBorrowed"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 4a2188d4fb..570566e293 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -155,6 +155,9 @@ enum ESimpleCounters {
COUNTER_COMPACTION_QUEUE_WAITING_REPEAT = 126 [(CounterOpts) = {Name: "BackgroundCompactionQueueWaitingRepeat"}];
COUNTER_SHARDS_WITH_BORROWED_DATA = 127 [(CounterOpts) = {Name: "TableShardsWithBorrowedData"}];
COUNTER_SHARDS_WITH_LOANED_DATA = 128 [(CounterOpts) = {Name: "TableShardsWithLoanedData"}];
+
+ COUNTER_BORROWED_COMPACTION_QUEUE_SIZE = 129 [(CounterOpts) = {Name: "BorrowedCompactionQueueSize"}];
+ COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING = 130 [(CounterOpts) = {Name: "BorrowedCompactionQueueRunning"}];
}
enum ECumulativeCounters {
@@ -246,6 +249,9 @@ enum ECumulativeCounters {
COUNTER_BACKGROUND_COMPACTION_BORROWED = 73 [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}];
COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74 [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}];
COUNTER_BACKGROUND_COMPACTION_LOANED = 75 [(CounterOpts) = {Name: "BackgroundCompactionLoaned"}];
+
+ COUNTER_BORROWED_COMPACTION_OK = 76 [(CounterOpts) = {Name: "BorrowedCompactionOK"}];
+ COUNTER_BORROWED_COMPACTION_TIMEOUT = 77 [(CounterOpts) = {Name: "BorrowedCompactionTimeout"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 001c68d222..32c041750e 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1430,6 +1430,11 @@ message TEvCompactBorrowed {
optional NKikimrProto.TPathID PathId = 1;
}
+message TEvCompactBorrowedResult {
+ optional uint64 TabletId = 1;
+ optional NKikimrProto.TPathID PathId = 2;
+}
+
message TEvGetCompactTableStats {
optional NKikimrProto.TPathID PathId = 1;
}
@@ -1437,6 +1442,7 @@ message TEvGetCompactTableStats {
message TEvGetCompactTableStatsResult {
optional uint64 BackgroundCompactionRequests = 1;
optional uint64 BackgroundCompactionCount = 2;
+ optional uint64 CompactBorrowedCount = 3;
}
// TEvRead is used to request multiple queries from a shard and
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index bf02891c77..8f135775cd 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -386,6 +386,11 @@ TAutoPtr<TSubset> TDatabase::ScanSnapshot(ui32 table, TRowVersion snapshot)
return Require(table)->ScanSnapshot(snapshot);
}
+bool TDatabase::HasBorrowed(ui32 table, ui64 selfTabletId) const
+{
+ return Require(table)->HasBorrowed(selfTabletId);
+}
+
TBundleSlicesMap TDatabase::LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const
{
return Require(table)->LookupSlices(bundles);
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index eda7bd4568..91bd2865f3 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -144,6 +144,8 @@ public:
TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const;
TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max());
+ bool HasBorrowed(ui32 table, ui64 selfTabletId) const;
+
TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const;
void ReplaceSlices(ui32 table, TBundleSlicesMap slices);
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 10242892e4..76ef090dd1 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -3812,6 +3812,11 @@ bool TExecutor::HasLoanedParts() const {
return false;
}
+bool TExecutor::HasBorrowed(ui32 table, ui64 selfTabletId) const {
+ Y_VERIFY_S(Database, "Checking borrowers of table# " << table << " for tablet# " << selfTabletId);
+ return Database->HasBorrowed(table, selfTabletId);
+}
+
const TExecutorStats& TExecutor::GetStats() const {
return *Stats;
}
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index b4d519b6e0..b9c6753bf0 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -649,6 +649,8 @@ public:
THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const override;
bool HasLoanedParts() const override;
+ bool HasBorrowed(ui32 table, ui64 selfTabletId) const override;
+
const TExecutorStats& GetStats() const override;
NMetrics::TResourceMetrics* GetResourceMetrics() const override;
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index 11b3b90199..f746954346 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -122,6 +122,23 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept
return subset;
}
+bool TTable::HasBorrowed(ui64 selfTabletId) const noexcept
+{
+ for (const auto &it : TxStatus)
+ if (it.second->Label.TabletID() != selfTabletId)
+ return true;
+
+ for (auto &it: Flatten)
+ if (it.second->Label.TabletID() != selfTabletId)
+ return true;
+
+ for (auto &it: ColdParts)
+ if (it.second->Label.TabletID() != selfTabletId)
+ return true;
+
+ return false;
+}
+
TAutoPtr<TSubset> TTable::ScanSnapshot(TRowVersion snapshot) noexcept
{
TAutoPtr<TSubset> subset = new TSubset(Epoch, Scheme);
@@ -915,16 +932,6 @@ TCompactionStats TTable::GetCompactionStats() const
stats.MemRowCount = GetMemRowCount();
stats.MemDataSize = GetMemSize();
stats.MemDataWaste = GetMemWaste();
-
- for (auto &it: ColdParts)
- stats.PartOwners.insert(it.second->Label.TabletID());
-
- for (auto &it: Flatten)
- stats.PartOwners.insert(it.second->Label.TabletID());
-
- for (auto &it: TxStatus)
- stats.PartOwners.insert(it.second->Label.TabletID());
-
stats.PartCount = Flatten.size() + ColdParts.size();
return stats;
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index a1fdeb9709..91dbf6f3fc 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -80,6 +80,8 @@ public:
TAutoPtr<TSubset> ScanSnapshot(TRowVersion snapshot = TRowVersion::Max()) noexcept;
TAutoPtr<TSubset> Unwrap() noexcept; /* full Subset(..) + final Replace(..) */
+ bool HasBorrowed(ui64 selfTabletId) const noexcept;
+
/**
* Returns current slices for bundles
*
diff --git a/ydb/core/tablet_flat/flat_table_stats.h b/ydb/core/tablet_flat/flat_table_stats.h
index 5b600dcf13..41b26a85fb 100644
--- a/ydb/core/tablet_flat/flat_table_stats.h
+++ b/ydb/core/tablet_flat/flat_table_stats.h
@@ -39,7 +39,6 @@ namespace NTable {
};
struct TCompactionStats {
- THashSet<ui64> PartOwners;
ui64 PartCount = 0;
ui64 MemRowCount = 0;
ui64 MemDataSize = 0;
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index 677106bc6b..f5949deb66 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -548,6 +548,8 @@ namespace NFlatExecutorSetup {
virtual THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const = 0;
virtual bool HasLoanedParts() const = 0;
+ virtual bool HasBorrowed(ui32 table, ui64 selfTabletId) const = 0;
+
// This method lets executor know about new yellow channels
virtual void OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) = 0;
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 898e4a502b..1de4c368ce 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -299,6 +299,8 @@ struct TEvDataShard {
EvReadAck,
EvReadCancel,
+ EvCompactBorrowedResult,
+
EvEnd
};
@@ -946,7 +948,7 @@ struct TEvDataShard {
// Arrow
std::shared_ptr<arrow::RecordBatch> ArrowBatch;
-
+
private:
// for local events
TVector<TOwnedCellVec> Rows;
@@ -1396,23 +1398,47 @@ struct TEvDataShard {
/**
* This message is used to ask datashard to compact any borrowed parts it has
- * for the specified user table. No reply is expected for this message,
- * instead schemeshard expects to receive updated stats if and when
- * such a compaction has finished.
+ * for the specified user table.
*/
- struct TEvCompactBorrowed : public TEventPB<TEvCompactBorrowed, NKikimrTxDataShard::TEvCompactBorrowed, EvCompactBorrowed> {
+ struct TEvCompactBorrowed : public TEventPB<TEvCompactBorrowed,
+ NKikimrTxDataShard::TEvCompactBorrowed,
+ EvCompactBorrowed> {
TEvCompactBorrowed() = default;
- TEvCompactBorrowed(const TPathId& pathId) {
- Record.MutablePathId()->SetOwnerId(pathId.OwnerId);
- Record.MutablePathId()->SetLocalId(pathId.LocalPathId);
+ TEvCompactBorrowed(ui64 ownerId, ui64 localId) {
+ Record.MutablePathId()->SetOwnerId(ownerId);
+ Record.MutablePathId()->SetLocalId(localId);
}
+ TEvCompactBorrowed(const TPathId& pathId)
+ : TEvCompactBorrowed(pathId.OwnerId, pathId.LocalPathId)
+ { }
+
// Sanity check for safe merging to earlier versions
static_assert(EvCompactBorrowed == EventSpaceBegin(TKikimrEvents::ES_TX_DATASHARD) + 7 * 512 + 60,
"EvCompactBorrowed event has an unexpected value");
};
+ struct TEvCompactBorrowedResult : public TEventPB<TEvCompactBorrowedResult,
+ NKikimrTxDataShard::TEvCompactBorrowedResult,
+ EvCompactBorrowedResult> {
+ TEvCompactBorrowedResult() = default;
+
+ TEvCompactBorrowedResult(ui64 tabletId, ui64 ownerId, ui64 localId) {
+ Record.SetTabletId(tabletId);
+ Record.MutablePathId()->SetOwnerId(ownerId);
+ Record.MutablePathId()->SetLocalId(localId);
+ }
+
+ TEvCompactBorrowedResult(ui64 tabletId, const TPathId& pathId)
+ : TEvCompactBorrowedResult(tabletId, pathId.OwnerId, pathId.LocalPathId)
+ {}
+
+ // Sanity check for safe merging to earlier versions
+ static_assert(EvCompactBorrowedResult == EventSpaceBegin(TKikimrEvents::ES_TX_DATASHARD) + 7 * 512 + 68,
+ "EvCompactBorrowedResult event has an unexpected value");
+ };
+
struct TEvGetCompactTableStats : public TEventPB<TEvGetCompactTableStats, NKikimrTxDataShard::TEvGetCompactTableStats,
TEvDataShard::EvGetCompactTableStats> {
TEvGetCompactTableStats() = default;
diff --git a/ydb/core/tx/datashard/datashard__compact_borrowed.cpp b/ydb/core/tx/datashard/datashard__compact_borrowed.cpp
index 8d36ee99a3..736238a44f 100644
--- a/ydb/core/tx/datashard/datashard__compact_borrowed.cpp
+++ b/ydb/core/tx/datashard/datashard__compact_borrowed.cpp
@@ -21,38 +21,31 @@ public:
<< " for table " << pathId
<< " at tablet " << Self->TabletID());
+ auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(Self->TabletID(), pathId);
+
if (pathId.OwnerId != Self->GetPathOwnerId()) {
// Ignore unexpected owner
+ ctx.Send(Ev->Sender, std::move(response));
return true;
}
auto it = Self->TableInfos.find(pathId.LocalPathId);
if (it == Self->TableInfos.end()) {
// Ignore unexpected table (may normally happen with races)
+ ctx.Send(Ev->Sender, std::move(response));
return true;
}
const TUserTable& tableInfo = *it->second;
- auto subset = txc.DB.Subset(tableInfo.LocalTid, NTable::TEpoch::Max(), {}, {});
-
- THashSet<ui64> partOwners;
- NTable::GetPartOwners(*subset, partOwners);
-
- bool hasBorrowed = false;
- for (ui64 tabletId : partOwners) {
- if (tabletId != Self->TabletID()) {
- hasBorrowed = true;
- break;
- }
- }
-
+ bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID());
if (!hasBorrowed) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TEvCompactBorrowed request from " << Ev->Sender
<< " for table " << pathId
<< " has no borrowed parts"
<< " at tablet " << Self->TabletID());
+ ctx.Send(Ev->Sender, std::move(response));
return true;
}
@@ -61,7 +54,13 @@ public:
<< " for table " << pathId
<< " starting compaction for local table " << tableInfo.LocalTid
<< " at tablet " << Self->TabletID());
+
Self->Executor()->CompactBorrowed(tableInfo.LocalTid);
+ Self->IncCounter(COUNTER_TX_COMPACT_BORROWED);
+ ++tableInfo.Stats.CompactBorrowedCount;
+
+ Self->CompactBorrowedWaiters[tableInfo.LocalTid].emplace_back(Ev->Sender);
+
return true;
}
diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp
index 317767d736..70001db0b3 100644
--- a/ydb/core/tx/datashard/datashard__compaction.cpp
+++ b/ydb/core/tx/datashard/datashard__compaction.cpp
@@ -68,17 +68,7 @@ public:
++tableInfo.Stats.BackgroundCompactionRequests;
- auto stats = txc.DB.GetCompactionStats(localTid);
-
- bool hasBorrowed = false;
- if (stats.PartOwners.size() > 1) {
- hasBorrowed = true;
- } else if (stats.PartOwners.size() == 1) {
- if (*stats.PartOwners.begin() != Self->TabletID()) {
- hasBorrowed = true;
- }
- }
-
+ bool hasBorrowed = txc.DB.HasBorrowed(tableInfo.LocalTid, Self->TabletID());
if (hasBorrowed && !record.GetCompactBorrowed()) {
// normally we should not receive requests to compact in this case
// but in some rare cases like schemeshard restart we can
@@ -117,6 +107,7 @@ public:
return true;
}
+ auto stats = txc.DB.GetCompactionStats(localTid);
bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0;
bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0;
if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) {
@@ -151,7 +142,7 @@ public:
<< ", memtableRows# " << stats.MemRowCount);
Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION);
- Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, pathId, Ev->Sender));
+ Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, Ev->Sender));
++tableInfo.Stats.BackgroundCompactionCount;
} else {
// compaction failed, for now we don't care
@@ -207,6 +198,7 @@ void TDataShard::Handle(TEvDataShard::TEvCompactTable::TPtr& ev, const TActorCon
void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {
auto finishedInfo = Executor()->GetFinishedCompactionInfo(tableId);
+ TLocalPathId localPathId = InvalidLocalPathId;
if (tableId >= Schema::MinLocalTid) {
for (auto& ti : TableInfos) {
if (ti.second->LocalTid != tableId && ti.second->ShadowTid != tableId)
@@ -223,40 +215,68 @@ void TDataShard::CompactionComplete(ui32 tableId, const TActorContext &ctx) {
}
ti.second->StatsNeedUpdate = true;
+ localPathId = ti.first;
UpdateTableStats(ctx);
break;
}
}
- ReplyCompactionWaiters(tableId, finishedInfo.Edge, ctx);
+ ReplyCompactionWaiters(tableId, localPathId, finishedInfo, ctx);
}
-void TDataShard::ReplyCompactionWaiters(ui32 tableId, ui64 edge, const TActorContext &ctx) {
+void TDataShard::ReplyCompactionWaiters(
+ ui32 tableId,
+ TLocalPathId localPathId,
+ const NTabletFlatExecutor::TFinishedCompactionInfo& compactionInfo,
+ const TActorContext &ctx)
+{
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"CompactionComplete of tablet# "<< TabletID()
<< ", table# " << tableId
- << ", finished edge# " << edge
+ << ", finished edge# " << compactionInfo.Edge
<< ", front# " << (CompactionWaiters[tableId].empty() ? 0UL : std::get<0>(CompactionWaiters[tableId].front())));
- while (!CompactionWaiters[tableId].empty()) {
+ auto& fullCompactionQueue = CompactionWaiters[tableId];
+ while (!fullCompactionQueue.empty()) {
const auto& waiter = CompactionWaiters[tableId].front();
- if (std::get<0>(waiter) > edge)
+ if (std::get<0>(waiter) > compactionInfo.Edge)
break;
- const auto& pathId = std::get<1>(waiter);
- const auto& sender = std::get<2>(waiter);
+ const auto& sender = std::get<1>(waiter);
auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>(
TabletID(),
- pathId.OwnerId,
- pathId.LocalPathId,
+ GetPathOwnerId(),
+ localPathId,
NKikimrTxDataShard::TEvCompactTableResult::OK);
ctx.Send(sender, std::move(response));
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Sending TEvCompactTableResult to# " << sender
- << "pathId# " << pathId);
+ << "pathId# " << TPathId(GetPathOwnerId(), localPathId));
- CompactionWaiters[tableId].pop_front();
+ fullCompactionQueue.pop_front();
+ }
+
+ auto& compactBorrowedQueue = CompactBorrowedWaiters[tableId];
+ if (!compactBorrowedQueue.empty()) {
+ const bool hasBorrowed = Executor()->HasBorrowed(tableId, TabletID());
+ if (!hasBorrowed) {
+ while (!compactBorrowedQueue.empty()) {
+ const auto& waiter = compactBorrowedQueue.front();
+
+ auto response = MakeHolder<TEvDataShard::TEvCompactBorrowedResult>(
+ TabletID(),
+ GetPathOwnerId(),
+ localPathId);
+ ctx.Send(waiter, std::move(response));
+
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Sending TEvCompactBorrowedResult to# " << waiter
+ << "pathId# " << TPathId(GetPathOwnerId(), localPathId));
+
+ compactBorrowedQueue.pop_front();
+ }
+ }
}
}
@@ -274,6 +294,7 @@ void TDataShard::Handle(TEvDataShard::TEvGetCompactTableStats::TPtr& ev, const T
const TUserTable& tableInfo = *it->second;
response->Record.SetBackgroundCompactionRequests(tableInfo.Stats.BackgroundCompactionRequests);
response->Record.SetBackgroundCompactionCount(tableInfo.Stats.BackgroundCompactionCount);
+ response->Record.SetCompactBorrowedCount(tableInfo.Stats.CompactBorrowedCount);
}
ctx.Send(ev->Sender, std::move(response));
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 8c7272e32b..e9ddb66d70 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1309,7 +1309,11 @@ public:
void CompactionComplete(ui32 tableId, const TActorContext &ctx) override;
void CompletedLoansChanged(const TActorContext &ctx) override;
- void ReplyCompactionWaiters(ui32 tableId, ui64 edge, const TActorContext &ctx);
+ void ReplyCompactionWaiters(
+ ui32 tableId,
+ TLocalPathId localPathId,
+ const NTabletFlatExecutor::TFinishedCompactionInfo& compactionInfo,
+ const TActorContext &ctx);
TUserTable::TSpecialUpdate SpecialUpdates(const NTable::TDatabase& db, const TTableId& tableId) const;
@@ -2198,8 +2202,8 @@ private:
// in
THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id
- // compactionId, tableId/ownerId, actorId
- using TCompactionWaiter = std::tuple<ui64, TPathId, TActorId>;
+ // compactionId, actorId
+ using TCompactionWaiter = std::tuple<ui64, TActorId>;
using TCompactionWaiterList = TList<TCompactionWaiter>;
// tableLocalTid -> waiters, note that compactionId is monotonically
@@ -2208,6 +2212,11 @@ private:
// from the front
THashMap<ui32, TCompactionWaiterList> CompactionWaiters;
+ using TCompactBorrowedWaiterList = TList<TActorId>;
+
+ // tableLocalTid -> waiters, similar to CompactionWaiters
+ THashMap<ui32, TCompactBorrowedWaiterList> CompactBorrowedWaiters;
+
struct TReplicationSourceOffsetsReceiveState {
// A set of tables for which we already received offsets
THashSet<TPathId> Received;
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 6bbd8dec1a..f3b6f8112c 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -320,6 +320,7 @@ struct TUserTable : public TThrRefBase {
ui64 RowCountResolution = 0;
ui64 BackgroundCompactionRequests = 0;
ui64 BackgroundCompactionCount = 0;
+ ui64 CompactBorrowedCount = 0;
NTable::TKeyAccessSample AccessStats;
void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) {
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index f4fa7ace57..14d5530153 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -36,7 +36,7 @@ class TOperationQueueWithTimer
private:
NKikimrServices::EServiceKikimr ServiceId = NKikimrServices::EServiceKikimr(LogServiceId);
TActorId LongTimerId;
- TInstant When;
+ TMonotonic When;
public:
TOperationQueueWithTimer(const typename TBase::TConfig& config,
@@ -60,31 +60,32 @@ public:
TActorBase::PassAway();
}
- TInstant GetWakeupTime() const { return When; }
+ TDuration GetWakeupDelta() const { return When - const_cast<TThis*>(this)->Now(); }
private:
// ITimer, note that it is made private,
// since it should be called only from TBase
- void SetWakeupTimer(TInstant t) override {
- if (When > t)
+ void SetWakeupTimer(TDuration delta) override {
+ if (LongTimerId)
this->Send(LongTimerId, new TEvents::TEvPoison);
- When = t;
- auto delta = t - this->Now();
+ When = this->Now() + delta;
auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId());
LongTimerId = CreateLongTimer(ctx, delta,
new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue));
LOG_DEBUG_S(ctx, ServiceId,
- "Operation queue set NextWakeup# " << When << ", delta# " << delta.Seconds() << " seconds");
+ "Operation queue set wakeup after delta# " << delta.Seconds() << " seconds");
}
- TInstant Now() override {
- return AppData()->TimeProvider->Now();
+ TMonotonic Now() override {
+ return AppData()->MonotonicTimeProvider->Now();
}
void HandleWakeup(const TActorContext &ctx) {
- LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup# " << this->Now());
+ LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup");
+ When = {};
+ LongTimerId = {};
TBase::Wakeup();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp
new file mode 100644
index 0000000000..1da2fd32e6
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard__borrowed_compaction.cpp
@@ -0,0 +1,161 @@
+#include "schemeshard_impl.h"
+
+namespace NKikimr::NSchemeShard {
+
+NOperationQueue::EStartStatus TSchemeShard::StartBorrowedCompaction(const TShardIdx& shardIdx) {
+ UpdateBorrowedCompactionQueueMetrics();
+
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+
+ auto it = ShardInfos.find(shardIdx);
+ if (it == ShardInfos.end()) {
+ LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info "
+ "for borrowed compaction# " << shardIdx
+ << " at schemeshard# " << TabletID());
+
+ return NOperationQueue::EStartStatus::EOperationRemove;
+ }
+
+ const auto& datashardId = it->second.TabletID;
+ const auto& pathId = it->second.PathId;
+
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBorrowedCompaction "
+ "for pathId# " << pathId << ", datashard# " << datashardId
+ << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta()
+ << ", rate# " << BorrowedCompactionQueue->GetRate()
+ << ", in queue# " << BorrowedCompactionQueue->Size() << " shards"
+ << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards"
+ << " at schemeshard " << TabletID());
+
+ std::unique_ptr<TEvDataShard::TEvCompactBorrowed> request(
+ new TEvDataShard::TEvCompactBorrowed(pathId.OwnerId, pathId.LocalPathId));
+
+ RunningBorrowedCompactions[shardIdx] = PipeClientCache->Send(
+ ctx,
+ ui64(datashardId),
+ request.release());
+
+ return NOperationQueue::EStartStatus::EOperationRunning;
+}
+
+void TSchemeShard::OnBorrowedCompactionTimeout(const TShardIdx& shardIdx) {
+ UpdateBorrowedCompactionQueueMetrics();
+ TabletCounters->Cumulative()[COUNTER_BORROWED_COMPACTION_TIMEOUT].Increment(1);
+
+ RunningBorrowedCompactions.erase(shardIdx);
+
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+
+ auto it = ShardInfos.find(shardIdx);
+ if (it == ShardInfos.end()) {
+ LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info "
+ "for timeout borrowed compaction# " << shardIdx
+ << " at schemeshard# " << TabletID());
+ return;
+ }
+
+ const auto& datashardId = it->second.TabletID;
+ const auto& pathId = it->second.PathId;
+
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Borrowed compaction timeout "
+ "for pathId# " << pathId << ", datashard# " << datashardId
+ << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta()
+ << ", in queue# " << BorrowedCompactionQueue->Size() << " shards"
+ << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards"
+ << " at schemeshard " << TabletID());
+
+ // retry
+ EnqueueBorrowedCompaction(shardIdx);
+}
+
+void TSchemeShard::BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId) {
+ auto tabletIt = TabletIdToShardIdx.find(tabletId);
+ if (tabletIt == TabletIdToShardIdx.end())
+ return; // just sanity check
+ const auto& shardIdx = tabletIt->second;
+
+ auto it = RunningBorrowedCompactions.find(shardIdx);
+ if (it == RunningBorrowedCompactions.end())
+ return;
+
+ if (it->second != clientId)
+ return;
+
+ RunningBorrowedCompactions.erase(it);
+
+ // disonnected from node we have requested borrowed compaction. We just resend request, because it
+ // is safe: if first request is executing or has been already executed, then second request will be ignored.
+
+ StartBorrowedCompaction(shardIdx);
+}
+
+void TSchemeShard::EnqueueBorrowedCompaction(const TShardIdx& shardIdx) {
+ if (!BorrowedCompactionQueue)
+ return;
+
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "borrowed compaction enqueue shard# " << shardIdx << " at schemeshard " << TabletID());
+
+ BorrowedCompactionQueue->Enqueue(shardIdx);
+ UpdateBorrowedCompactionQueueMetrics();
+}
+
+void TSchemeShard::RemoveBorrowedCompaction(const TShardIdx& shardIdx) {
+ if (!BorrowedCompactionQueue)
+ return;
+
+ RunningBorrowedCompactions.erase(shardIdx);
+ BorrowedCompactionQueue->Remove(shardIdx);
+ UpdateBorrowedCompactionQueueMetrics();
+}
+
+void TSchemeShard::UpdateBorrowedCompactionQueueMetrics() {
+ if (!BorrowedCompactionQueue)
+ return;
+
+ TabletCounters->Simple()[COUNTER_BORROWED_COMPACTION_QUEUE_SIZE].Set(BorrowedCompactionQueue->Size());
+ TabletCounters->Simple()[COUNTER_BORROWED_COMPACTION_QUEUE_RUNNING].Set(BorrowedCompactionQueue->RunningSize());
+}
+
+void TSchemeShard::Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx) {
+ const auto& record = ev->Get()->Record;
+
+ const TTabletId tabletId(record.GetTabletId());
+ const TShardIdx shardIdx = GetShardIdx(tabletId);
+
+ auto pathId = TPathId(
+ record.GetPathId().GetOwnerId(),
+ record.GetPathId().GetLocalId());
+
+ auto duration = BorrowedCompactionQueue->OnDone(shardIdx);
+
+ if (shardIdx == InvalidShardIdx) {
+ LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished borrowed compaction of unknown shard "
+ "for pathId# " << pathId << ", datashard# " << tabletId
+ << " in# " << duration.MilliSeconds()
+ << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta()
+ << ", rate# " << BorrowedCompactionQueue->GetRate()
+ << ", in queue# " << BorrowedCompactionQueue->Size() << " shards"
+ << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards"
+ << " at schemeshard " << TabletID());
+ } else {
+ LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished borrowed compaction "
+ "for pathId# " << pathId << ", datashard# " << tabletId
+ << ", shardIdx# " << shardIdx
+ << " in# " << duration.MilliSeconds()
+ << ", next wakeup# " << BorrowedCompactionQueue->GetWakeupDelta()
+ << ", rate# " << BorrowedCompactionQueue->GetRate()
+ << ", in queue# " << BorrowedCompactionQueue->Size() << " shards"
+ << ", running# " << BorrowedCompactionQueue->RunningSize() << " shards"
+ << " at schemeshard " << TabletID());
+ }
+
+ RunningBorrowedCompactions.erase(shardIdx);
+
+ TabletCounters->Cumulative()[COUNTER_BORROWED_COMPACTION_OK].Increment(1);
+ UpdateBorrowedCompactionQueueMetrics();
+}
+
+} // NKikimr::NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index d16e5b0ae8..810be23a9d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -1,7 +1,6 @@
#include "schemeshard_impl.h"
-namespace NKikimr {
-namespace NSchemeShard {
+namespace NKikimr::NSchemeShard {
NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardCompactionInfo& info) {
UpdateBackgroundCompactionQueueMetrics();
@@ -24,7 +23,7 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction "
"for pathId# " << pathId << ", datashard# " << datashardId
<< ", compactionInfo# " << info
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -65,7 +64,7 @@ void TSchemeShard::OnBackgroundCompactionTimeout(const TShardCompactionInfo& inf
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout "
"for pathId# " << pathId << ", datashard# " << datashardId
<< ", compactionInfo# " << info
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -94,7 +93,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction of unknown shard "
"for pathId# " << pathId << ", datashard# " << tabletId
<< " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -105,7 +104,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
"for pathId# " << pathId << ", datashard# " << tabletId
<< ", shardIdx# " << shardIdx
<< " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
- << ", next wakeup# " << CompactionQueue->GetWakeupTime()
+ << ", next wakeup in# " << CompactionQueue->GetWakeupDelta()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
<< ", waiting after compaction# " << CompactionQueue->WaitingSize() << " shards"
@@ -138,7 +137,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
UpdateBackgroundCompactionQueueMetrics();
}
-void TSchemeShard::EnqueueCompaction(
+void TSchemeShard::EnqueueBackgroundCompaction(
const TShardIdx& shardIdx,
const TTableInfo::TPartitionStats& stats)
{
@@ -173,7 +172,7 @@ void TSchemeShard::EnqueueCompaction(
UpdateBackgroundCompactionQueueMetrics();
}
-void TSchemeShard::UpdateCompaction(
+void TSchemeShard::UpdateBackgroundCompaction(
const TShardIdx& shardIdx,
const TTableInfo::TPartitionStats& newStats)
{
@@ -186,7 +185,7 @@ void TSchemeShard::UpdateCompaction(
LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"background compaction update removed shard# " << shardIdx
<< " with borrowed parts at schemeshard " << TabletID());
- RemoveCompaction(shardIdx);
+ RemoveBackgroundCompaction(shardIdx);
return;
}
@@ -194,7 +193,7 @@ void TSchemeShard::UpdateCompaction(
LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"background compaction update removed shard# " << shardIdx
<< " with loaned parts at schemeshard " << TabletID());
- RemoveCompaction(shardIdx);
+ RemoveBackgroundCompaction(shardIdx);
return;
}
@@ -212,7 +211,7 @@ void TSchemeShard::UpdateCompaction(
UpdateBackgroundCompactionQueueMetrics();
}
-void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) {
+void TSchemeShard::RemoveBackgroundCompaction(const TShardIdx& shardIdx) {
if (!CompactionQueue)
return;
@@ -221,11 +220,15 @@ void TSchemeShard::RemoveCompaction(const TShardIdx& shardIdx) {
}
void TSchemeShard::ShardRemoved(const TShardIdx& shardIdx) {
- RemoveCompaction(shardIdx);
+ RemoveBackgroundCompaction(shardIdx);
+ RemoveBorrowedCompaction(shardIdx);
RemoveShardMetrics(shardIdx);
}
void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() {
+ if (!CompactionQueue)
+ return;
+
TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_SIZE].Set(CompactionQueue->Size());
TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_RUNNING].Set(CompactionQueue->RunningSize());
TabletCounters->Simple()[COUNTER_COMPACTION_QUEUE_WAITING_REPEAT].Set(CompactionQueue->WaitingSize());
@@ -300,5 +303,4 @@ void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) {
PartitionMetricsMap.erase(it);
}
-} // NSchemeShard
-} // NKikimr
+} // NKikimr::NSchemeShard
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
index 1945d3cfed..1da7c610a4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.cpp
@@ -380,7 +380,7 @@ void NTableState::UpdatePartitioningForCopyTable(TOperationId operationId, TTxSt
NIceDb::TNiceDb db(context.GetDB());
// Erase previous partitioning as we are going to generate new one
- context.SS->DeleteTablePartitioning(db, txState.TargetPathId, dstTableInfo);
+ context.SS->PersistTablePartitioningDeletion(db, txState.TargetPathId, dstTableInfo);
// Remove old shardIdx info and old txShards
for (const auto& shard : txState.Shards) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
index 9a4bfc7daf..055525640f 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
@@ -290,10 +290,11 @@ public:
auto oldAggrStats = tableInfo->GetStats().Aggregated;
// Delete the whole old partitioning and persist the whole new partitionig as the indexes have changed
- context.SS->DeleteTablePartitioning(db, tableId, tableInfo);
+ context.SS->PersistTablePartitioningDeletion(db, tableId, tableInfo);
context.SS->SetPartitioning(tableId, tableInfo, std::move(newPartitioning));
context.SS->PersistTablePartitioning(db, tableId, tableInfo);
context.SS->PersistTablePartitionStats(db, tableId, tableInfo);
+
context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_ACTIVE_COUNT].Sub(allSrcShardIdxs.size());
context.SS->TabletCounters->Simple()[COUNTER_TABLE_SHARD_INACTIVE_COUNT].Add(allSrcShardIdxs.size());
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index a464fc39f9..065836b85d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -194,10 +194,14 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
table->UpdateShardStats(shardIdx, newStats);
if (!table->IsBackup) {
- Self->UpdateCompaction(shardIdx, newStats);
+ Self->UpdateBackgroundCompaction(shardIdx, newStats);
Self->UpdateShardMetrics(shardIdx, newStats);
}
+ if (!newStats.HasBorrowedData) {
+ Self->RemoveBorrowedCompaction(shardIdx);
+ }
+
NIceDb::TNiceDb db(txc.DB);
if (!table->IsBackup && !table->IsShardsStatsDetached()) {
@@ -333,7 +337,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
if (newStats.HasBorrowedData) {
// We don't want to split shards that have borrow parts
// We must ask them to compact first
- CompactEv.Reset(new TEvDataShard::TEvCompactBorrowed(tableId));
+ Self->EnqueueBorrowedCompaction(shardIdx);
return true;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 62a50500e0..83c4b64bfc 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -90,7 +90,7 @@ void TSchemeShard::ActivateAfterInitialization(const TActorContext& ctx,
ScheduleCleanDroppedPaths();
ScheduleCleanDroppedSubDomains();
- StartStopCompactionQueue();
+ StartStopCompactionQueues();
ctx.Send(TxAllocatorClient, MakeHolder<TEvTxAllocatorClient::TEvAllocate>(InitiateCachedTxIdsCount));
@@ -285,10 +285,17 @@ TMessageSeqNo TSchemeShard::NextRound() {
void TSchemeShard::Clear() {
PathsById.clear();
Tables.clear();
+
if (CompactionQueue) {
CompactionQueue->Clear();
UpdateBackgroundCompactionQueueMetrics();
}
+
+ if (BorrowedCompactionQueue) {
+ BorrowedCompactionQueue->Clear();
+ UpdateBorrowedCompactionQueueMetrics();
+ }
+
ShardsWithBorrowed.clear();
ShardsWithLoaned.clear();
PersQueueGroups.clear();
@@ -2078,7 +2085,7 @@ void TSchemeShard::PersistTablePartitioning(NIceDb::TNiceDb& db, const TPathId p
}
}
-void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
+void TSchemeShard::PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
const auto& partitions = tableInfo->GetPartitions();
for (ui64 pi = 0; pi < partitions.size(); ++pi) {
if (IsLocalId(pathId)) {
@@ -2086,8 +2093,6 @@ void TSchemeShard::DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId pa
}
db.Table<Schema::MigratedTablePartitions>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete();
db.Table<Schema::TablePartitionStats>().Key(pathId.OwnerId, pathId.LocalPathId, pi).Delete();
-
- ShardRemoved(partitions[pi].ShardIdx);
}
}
@@ -3679,6 +3684,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info)
new TPipeClientFactory(this)))
, PipeTracker(*PipeClientCache)
, CompactionStarter(this)
+ , BorrowedCompactionStarter(this)
, ShardDeleter(info->TabletID)
, AllowDataColumnForIndexTable(0, 0, 1)
, EnableAsyncIndexes(0, 0, 1)
@@ -3754,6 +3760,9 @@ void TSchemeShard::Die(const TActorContext &ctx) {
if (CompactionQueue)
CompactionQueue->Shutdown(ctx);
+ if (BorrowedCompactionQueue)
+ BorrowedCompactionQueue->Shutdown(ctx);
+
return IActor::Die(ctx);
}
@@ -3789,10 +3798,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
EnableBackgroundCompaction = appData->FeatureFlags.GetEnableBackgroundCompaction();
EnableBackgroundCompactionServerless = appData->FeatureFlags.GetEnableBackgroundCompactionServerless();
- if (!CompactionQueue)
- ConfigureCompactionQueue(appData->CompactionConfig.GetBackgroundCompactionConfig(), ctx);
- ctx.RegisterWithSameMailbox(CompactionQueue);
+ ConfigureCompactionQueues(appData->CompactionConfig, ctx);
if (appData->ChannelProfiles) {
ChannelProfiles = appData->ChannelProfiles;
@@ -3973,6 +3980,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvSchemeShard::TEvMigrateSchemeShardResult, Handle);
HFuncTraced(TEvDataShard::TEvMigrateSchemeShardResponse, Handle);
HFuncTraced(TEvDataShard::TEvCompactTableResult, Handle);
+ HFuncTraced(TEvDataShard::TEvCompactBorrowedResult, Handle);
HFuncTraced(TEvSchemeShard::TEvSyncTenantSchemeShard, Handle);
HFuncTraced(TEvSchemeShard::TEvUpdateTenantSchemeShard, Handle);
@@ -4613,6 +4621,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TAc
<< ", to tablet: " << tabletId
<< ", at schemeshard: " << TabletID());
+ BorrowedCompactionHandleDisconnect(tabletId, clientId);
RestartPipeTx(tabletId, ctx);
}
@@ -4650,6 +4659,7 @@ void TSchemeShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TAc
return;
}
+ BorrowedCompactionHandleDisconnect(tabletId, clientId);
RestartPipeTx(tabletId, ctx);
}
@@ -5826,14 +5836,32 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, T
}
if (!tableInfo->IsBackup) {
+ // partitions updated:
+ // 1. We need to remove some parts from compaction queues.
+ // 2. We need to add new ones to the queues. Since we can safely enqueue already
+ // enqueued parts, we simple enqueue each part in newPartitioning
+ THashSet<TShardIdx> newPartitioningSet;
+ newPartitioningSet.reserve(newPartitioning.size());
+ const auto& oldPartitioning = tableInfo->GetPartitions();
+
for (const auto& p: newPartitioning) {
+ if (!oldPartitioning.empty())
+ newPartitioningSet.insert(p.ShardIdx);
+
const auto& partitionStats = tableInfo->GetStats().PartitionStats;
auto it = partitionStats.find(p.ShardIdx);
if (it != partitionStats.end()) {
- EnqueueCompaction(p.ShardIdx, it->second);
+ EnqueueBackgroundCompaction(p.ShardIdx, it->second);
UpdateShardMetrics(p.ShardIdx, it->second);
}
}
+
+ for (const auto& p: oldPartitioning) {
+ if (!newPartitioningSet.contains(p.ShardIdx)) {
+ // note that queues might not contain the shard
+ ShardRemoved(p.ShardIdx);
+ }
+ }
}
tableInfo->SetPartitioning(std::move(newPartitioning));
@@ -5984,13 +6012,11 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
if (appConfig.HasCompactionConfig()) {
const auto& compactionConfig = appConfig.GetCompactionConfig();
- if (compactionConfig.HasBackgroundCompactionConfig()) {
- ConfigureCompactionQueue(compactionConfig.GetBackgroundCompactionConfig(), ctx);
- }
+ ConfigureCompactionQueues(compactionConfig, ctx);
}
if (IsShemeShardConfigured()) {
- StartStopCompactionQueue();
+ StartStopCompactionQueues();
}
}
@@ -6017,7 +6043,24 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu
EnableBackgroundCompactionServerless = featureFlags.GetEnableBackgroundCompactionServerless();
}
-void TSchemeShard::ConfigureCompactionQueue(
+void TSchemeShard::ConfigureCompactionQueues(
+ const NKikimrConfig::TCompactionConfig& compactionConfig,
+ const TActorContext &ctx)
+{
+ if (compactionConfig.HasBackgroundCompactionConfig()) {
+ ConfigureBackgroundCompactionQueue(compactionConfig.GetBackgroundCompactionConfig(), ctx);
+ } else {
+ ConfigureBackgroundCompactionQueue(NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig(), ctx);
+ }
+
+ if (compactionConfig.HasBorrowedCompactionConfig()) {
+ ConfigureBorrowedCompactionQueue(compactionConfig.GetBorrowedCompactionConfig(), ctx);
+ } else {
+ ConfigureBorrowedCompactionQueue(NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig(), ctx);
+ }
+}
+
+void TSchemeShard::ConfigureBackgroundCompactionQueue(
const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config,
const TActorContext &ctx)
{
@@ -6048,10 +6091,11 @@ void TSchemeShard::ConfigureCompactionQueue(
compactionConfig,
queueConfig,
CompactionStarter);
+ ctx.RegisterWithSameMailbox(CompactionQueue);
}
LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "CompactionQueue configured: Timeout# " << compactionConfig.Timeout
+ "BackgroundCompactionQueue configured: Timeout# " << compactionConfig.Timeout
<< ", compact single parted# " << (queueConfig.CompactSinglePartedShards ? "yes" : "no")
<< ", Rate# " << CompactionQueue->GetRate()
<< ", WakeupInterval# " << compactionConfig.WakeupInterval
@@ -6061,7 +6105,34 @@ void TSchemeShard::ConfigureCompactionQueue(
<< ", MaxRate# " << compactionConfig.MaxRate);
}
-void TSchemeShard::StartStopCompactionQueue() {
+void TSchemeShard::ConfigureBorrowedCompactionQueue(
+ const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config,
+ const TActorContext &ctx)
+{
+ TBorrowedCompactionQueue::TConfig compactionConfig;
+
+ compactionConfig.IsCircular = false;
+ compactionConfig.Timeout = TDuration::Seconds(config.GetTimeoutSeconds());
+ compactionConfig.InflightLimit = config.GetInflightLimit();
+ compactionConfig.MaxRate = config.GetMaxRate();
+
+ if (BorrowedCompactionQueue) {
+ BorrowedCompactionQueue->UpdateConfig(compactionConfig);
+ } else {
+ BorrowedCompactionQueue = new TBorrowedCompactionQueue(
+ compactionConfig,
+ BorrowedCompactionStarter);
+ ctx.RegisterWithSameMailbox(BorrowedCompactionQueue);
+ }
+
+ LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "BorrowedCompactionQueue configured: Timeout# " << compactionConfig.Timeout
+ << ", Rate# " << BorrowedCompactionQueue->GetRate()
+ << ", WakeupInterval# " << compactionConfig.WakeupInterval
+ << ", InflightLimit# " << compactionConfig.InflightLimit);
+}
+
+void TSchemeShard::StartStopCompactionQueues() {
// note, that we don't need to check current state of compaction queue
if (IsServerlessDomain(TPath::Init(RootPathId(), this))) {
if (EnableBackgroundCompactionServerless) {
@@ -6076,6 +6147,8 @@ void TSchemeShard::StartStopCompactionQueue() {
CompactionQueue->Stop();
}
}
+
+ BorrowedCompactionQueue->Start();
}
void TSchemeShard::Handle(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr &, const TActorContext &ctx) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 99df4e7b65..e7fcebbeed 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -99,6 +99,30 @@ private:
TSchemeShard* Self;
};
+ using TBorrowedCompactionQueue = NOperationQueue::TOperationQueueWithTimer<
+ TShardIdx,
+ TFifoQueue<TShardIdx>,
+ TEvPrivate::EvRunBorrowedCompaction,
+ NKikimrServices::FLAT_TX_SCHEMESHARD>;
+
+ class TBorrowedCompactionStarter : public TBorrowedCompactionQueue::IStarter {
+ public:
+ TBorrowedCompactionStarter(TSchemeShard* self)
+ : Self(self)
+ { }
+
+ NOperationQueue::EStartStatus StartOperation(const TShardIdx& shardIdx) override {
+ return Self->StartBorrowedCompaction(shardIdx);
+ }
+
+ void OnTimeout(const TShardIdx& shardIdx) override {
+ Self->OnBorrowedCompactionTimeout(shardIdx);
+ }
+
+ private:
+ TSchemeShard* Self;
+ };
+
public:
static constexpr ui32 DefaultPQTabletPartitionsCount = 1;
static constexpr ui32 MaxPQTabletPartitionsCount = 1000;
@@ -207,6 +231,13 @@ public:
TCompactionStarter CompactionStarter;
TCompactionQueue* CompactionQueue = nullptr;
+
+ TBorrowedCompactionStarter BorrowedCompactionStarter;
+ TBorrowedCompactionQueue* BorrowedCompactionQueue = nullptr;
+
+ // shardIdx -> clientId
+ THashMap<TShardIdx, TActorId> RunningBorrowedCompactions;
+
THashSet<TShardIdx> ShardsWithBorrowed; // shards have parts from another shards
THashSet<TShardIdx> ShardsWithLoaned; // shards have parts loaned to another shards
bool EnableBackgroundCompaction = false;
@@ -298,10 +329,19 @@ public:
void ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfig, const TActorContext& ctx);
void ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featureFlags, const TActorContext& ctx);
- void ConfigureCompactionQueue(
+ void ConfigureCompactionQueues(
+ const NKikimrConfig::TCompactionConfig& config,
+ const TActorContext &ctx);
+
+ void ConfigureBackgroundCompactionQueue(
const NKikimrConfig::TCompactionConfig::TBackgroundCompactionConfig& config,
const TActorContext &ctx);
- void StartStopCompactionQueue();
+
+ void ConfigureBorrowedCompactionQueue(
+ const NKikimrConfig::TCompactionConfig::TBorrowedCompactionConfig& config,
+ const TActorContext &ctx);
+
+ void StartStopCompactionQueues();
bool ApplyStorageConfig(const TStoragePools& storagePools,
const NKikimrSchemeOp::TStorageConfig& storageConfig,
@@ -464,7 +504,7 @@ public:
void PersistTable(NIceDb::TNiceDb &db, const TPathId pathId);
void PersistChannelsBinding(NIceDb::TNiceDb& db, const TShardIdx shardId, const TChannelsBindings& bindedChannels);
void PersistTablePartitioning(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
- void DeleteTablePartitioning(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo);
+ void PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo);
void PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo);
void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats);
void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TShardIdx& shardIdx, const TTableInfo::TPtr tableInfo);
@@ -640,9 +680,12 @@ public:
void ScheduleCleanDroppedPaths();
void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx);
- void EnqueueCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats);
- void UpdateCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats);
- void RemoveCompaction(const TShardIdx& shardIdx);
+ void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats);
+ void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats);
+ void RemoveBackgroundCompaction(const TShardIdx& shardIdx);
+
+ void EnqueueBorrowedCompaction(const TShardIdx& shardIdx);
+ void RemoveBorrowedCompaction(const TShardIdx& shardIdx);
void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats);
void RemoveShardMetrics(const TShardIdx& shardIdx);
@@ -653,6 +696,11 @@ public:
void OnBackgroundCompactionTimeout(const TShardCompactionInfo& info);
void UpdateBackgroundCompactionQueueMetrics();
+ NOperationQueue::EStartStatus StartBorrowedCompaction(const TShardIdx& shardIdx);
+ void OnBorrowedCompactionTimeout(const TShardIdx& shardIdx);
+ void BorrowedCompactionHandleDisconnect(TTabletId tabletId, const TActorId& clientId);
+ void UpdateBorrowedCompactionQueueMetrics();
+
struct TTxCleanDroppedSubDomains;
NTabletFlatExecutor::ITransaction* CreateTxCleanDroppedSubDomains();
@@ -817,6 +865,7 @@ public:
void Handle(TEvSchemeShard::TEvMigrateSchemeShardResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvMigrateSchemeShardResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx);
void Handle(TEvSchemeShard::TEvSyncTenantSchemeShard::TPtr& ev, const TActorContext& ctx);
void Handle(TEvSchemeShard::TEvUpdateTenantSchemeShard::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h
index 207fd8d2c7..751de0231e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_private.h
+++ b/ydb/core/tx/schemeshard/schemeshard_private.h
@@ -21,6 +21,7 @@ struct TEvPrivate {
EvSubscribeToShardDeletion,
EvNotifyShardDeleted,
EvRunBackgroundCompaction,
+ EvRunBorrowedCompaction,
EvCompletePublication,
EvCompleteBarrier,
EvEnd
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index 8d768d3aa0..4da2c1e22b 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -12,6 +12,9 @@ using namespace NSchemeShardUT_Private;
namespace {
+constexpr TDuration DefaultTimeout = TDuration::Seconds(30);
+constexpr TDuration RetryDelay = TDuration::Seconds(1);
+
using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse::TUserTable>;
TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) {
@@ -70,25 +73,13 @@ TPathInfo GetPathInfo(
return info;
}
-void CreateTableWithData(
+void WriteData(
TTestActorRuntime &runtime,
- TTestEnv& env,
- const char* path,
const char* name,
- ui32 shardsCount,
- ui64& txId,
- ui64 schemeshardId = TTestTxConfig::SchemeShard)
+ ui64 fromKeyInclusive,
+ ui64 toKey,
+ ui64 tabletId = TTestTxConfig::FakeHiveTablets)
{
- TestCreateTable(runtime, schemeshardId, ++txId, path,
- Sprintf(R"____(
- Name: "%s"
- Columns { Name: "key" Type: "Uint64"}
- Columns { Name: "value" Type: "Utf8"}
- KeyColumnNames: ["key"]
- UniformPartitionsCount: %d
- )____", name, shardsCount));
- env.TestWaitNotification(runtime, txId, schemeshardId);
-
auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) {
TString writeQuery = Sprintf(R"(
(
@@ -104,37 +95,82 @@ void CreateTableWithData(
UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
};
- for (ui64 key = 0; key < 100; ++key) {
- fnWriteRow(TTestTxConfig::FakeHiveTablets, key, name);
+ for (ui64 key = fromKeyInclusive; key < toKey; ++key) {
+ fnWriteRow(tabletId, key, name);
}
}
-void SetFeatures(
+void CreateTableWithData(
+ TTestActorRuntime &runtime,
+ TTestEnv& env,
+ const char* path,
+ const char* name,
+ ui32 shardsCount,
+ ui64& txId,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
+{
+ TestCreateTable(runtime, schemeshardId, ++txId, path,
+ Sprintf(R"____(
+ Name: "%s"
+ Columns { Name: "key" Type: "Uint64"}
+ Columns { Name: "value" Type: "Utf8"}
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: %d
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: %d
+ MaxPartitionsCount: %d
+ }
+ }
+ )____", name, shardsCount, shardsCount, shardsCount));
+ env.TestWaitNotification(runtime, txId, schemeshardId);
+
+ WriteData(runtime, name, 0, 100);
+}
+
+void SetConfig(
TTestActorRuntime &runtime,
- TTestEnv&,
ui64 schemeShard,
- const NKikimrConfig::TFeatureFlags& features)
+ THolder<NConsole::TEvConsole::TEvConfigNotificationRequest> request)
{
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries());
+
+ TAutoPtr<IEventHandle> handle;
+ runtime.GrabEdgeEventRethrow<NConsole::TEvConsole::TEvConfigNotificationResponse>(handle);
+}
+
+THolder<NConsole::TEvConsole::TEvConfigNotificationRequest> GetTestCompactionConfig() {
auto request = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationRequest>();
- *request->Record.MutableConfig()->MutableFeatureFlags() = features;
- // little hack to simplify life
+ // little hacks to simplify life
auto* compactionConfig = request->Record.MutableConfig()->MutableCompactionConfig();
compactionConfig->MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig->MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ compactionConfig->MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds());
+ compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds());
// 1 compaction / second
compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
compactionConfig->MutableBackgroundCompactionConfig()->SetMaxRate(1);
compactionConfig->MutableBackgroundCompactionConfig()->SetRoundSeconds(0);
- auto sender = runtime.AllocateEdgeActor();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
- runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries());
+ return request;
+}
- TAutoPtr<IEventHandle> handle;
- runtime.GrabEdgeEventRethrow<NConsole::TEvConsole::TEvConfigNotificationResponse>(handle);
+void SetFeatures(
+ TTestActorRuntime &runtime,
+ TTestEnv&,
+ ui64 schemeShard,
+ const NKikimrConfig::TFeatureFlags& features)
+{
+ auto request = GetTestCompactionConfig();
+ *request->Record.MutableConfig()->MutableFeatureFlags() = features;
+ SetConfig(runtime, schemeShard, std::move(request));
}
void SetBackgroundCompactionServerless(TTestActorRuntime &runtime, TTestEnv& env, ui64 schemeShard, bool value) {
@@ -163,12 +199,16 @@ void DisableBackgroundCompactionViaRestart(
compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds());
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds());
// 1 compaction / second
compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1);
compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0);
+ compactionConfig.MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+
TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, schemeShard, sender);
}
@@ -188,12 +228,16 @@ void EnableBackgroundCompactionViaRestart(
compactionConfig.MutableBackgroundCompactionConfig()->SetSearchHeightThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetTimeoutSeconds(DefaultTimeout.Seconds());
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(RetryDelay.Seconds());
// 1 compaction / second
compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1);
compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0);
+ compactionConfig.MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+
TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, schemeShard, sender);
}
@@ -201,17 +245,20 @@ void EnableBackgroundCompactionViaRestart(
struct TCompactionStats {
ui64 BackgroundRequestCount = 0;
ui64 BackgroundCompactionCount = 0;
+ ui64 CompactBorrowedCount = 0;
TCompactionStats() = default;
TCompactionStats(const NKikimrTxDataShard::TEvGetCompactTableStatsResult& stats)
: BackgroundRequestCount(stats.GetBackgroundCompactionRequests())
, BackgroundCompactionCount(stats.GetBackgroundCompactionCount())
+ , CompactBorrowedCount(stats.GetCompactBorrowedCount())
{}
void Update(const TCompactionStats& other) {
BackgroundRequestCount += other.BackgroundRequestCount;
BackgroundCompactionCount += other.BackgroundCompactionCount;
+ CompactBorrowedCount += other.CompactBorrowedCount;
}
};
@@ -267,12 +314,41 @@ TCompactionStats GetCompactionStats(
info.OwnerId);
}
-void CheckShardCompacted(
+void CheckShardBorrowedCompacted(
+ TTestActorRuntime &runtime,
+ const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,
+ ui64 tabletId,
+ ui64 ownerId)
+{
+ auto count = GetCompactionStats(
+ runtime,
+ userTable,
+ tabletId,
+ ownerId).CompactBorrowedCount;
+
+ UNIT_ASSERT(count > 0);
+}
+
+void CheckShardNotBorrowedCompacted(
+ TTestActorRuntime &runtime,
+ const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,
+ ui64 tabletId,
+ ui64 ownerId)
+{
+ auto count = GetCompactionStats(
+ runtime,
+ userTable,
+ tabletId,
+ ownerId).CompactBorrowedCount;
+
+ UNIT_ASSERT_VALUES_EQUAL(count, 0UL);
+}
+
+void CheckShardBackgroundCompacted(
TTestActorRuntime &runtime,
const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,
ui64 tabletId,
- ui64 ownerId,
- bool shouldCompacted = true)
+ ui64 ownerId)
{
auto count = GetCompactionStats(
runtime,
@@ -280,14 +356,25 @@ void CheckShardCompacted(
tabletId,
ownerId).BackgroundRequestCount;
- if (shouldCompacted) {
- UNIT_ASSERT(count > 0);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(count, 0UL);
- }
+ UNIT_ASSERT(count > 0);
}
-void CheckNoCompactionsInPeriod(
+void CheckShardNotBackgroundCompacted(
+ TTestActorRuntime &runtime,
+ const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,
+ ui64 tabletId,
+ ui64 ownerId)
+{
+ auto count = GetCompactionStats(
+ runtime,
+ userTable,
+ tabletId,
+ ownerId).BackgroundRequestCount;
+
+ UNIT_ASSERT_VALUES_EQUAL(count, 0UL);
+}
+
+void CheckNoBackgroundCompactionsInPeriod(
TTestActorRuntime &runtime,
TTestEnv& env,
const TString& path,
@@ -329,8 +416,10 @@ void TestBackgroundCompaction(
enableBackgroundCompactionFunc(runtime, env);
env.SimulateSleep(runtime, TDuration::Seconds(30));
- for (auto shard: info.Shards)
- CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ for (auto shard: info.Shards) {
+ CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ CheckShardNotBorrowedCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ }
}
ui64 TestServerless(
@@ -421,8 +510,14 @@ ui64 TestServerless(
env.SimulateSleep(runtime, TDuration::Seconds(30));
- for (auto shard: info.Shards)
- CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId, enableServerless);
+ for (auto shard: info.Shards) {
+ if (enableServerless)
+ CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ else
+ CheckShardNotBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId);
+
+ CheckShardNotBorrowedCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ }
return schemeshardId;
}
@@ -485,7 +580,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
// some time to finish compactions in progress
env.SimulateSleep(runtime, TDuration::Seconds(30));
- CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
+ CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
}
Y_UNIT_TEST(ShouldNotCompactServerless) {
@@ -527,7 +622,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
// some time to finish compactions in progress
env.SimulateSleep(runtime, TDuration::Seconds(30));
- CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId);
+ CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId);
}
Y_UNIT_TEST(SchemeshardShouldNotCompactBackups) {
@@ -556,7 +651,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
- CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");
+ CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");
UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL);
}
@@ -566,9 +661,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
TTestEnv env(runtime);
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_TRACE);
// disable for the case, when compaction is enabled by default
SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false);
@@ -604,11 +697,386 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
- CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");
+ CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");
UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL);
// original table should not be compacted as well
- CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
+ CheckNoBackgroundCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldHandleCompactionTimeouts) {
+ // note that this test is good to test TOperationQueueWithTimer
+
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+
+ size_t compactionResultCount = 0;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCompactTableResult: {
+ Y_UNUSED(ev.Release());
+ ++compactionResultCount;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ while (compactionResultCount < 3UL)
+ env.SimulateSleep(runtime, DefaultTimeout + RetryDelay + TDuration::Seconds(1));
+ }
+};
+
+Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) {
+ Y_UNIT_TEST(SchemeshardShouldCompactBorrowed) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ // in case it is not enabled by default
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+
+ auto configRequest = GetTestCompactionConfig();
+ auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+
+ SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest));
+
+ ui64 txId = 1000;
+
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 5, txId);
+
+ {
+ // write to all shards in hacky way
+ auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ for (auto shard: simpleInfo.Shards) {
+ WriteData(runtime, "Simple", 0, 100, shard);
+ }
+ }
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+
+ // copy table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Simple"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ env.SimulateSleep(runtime, TDuration::Seconds(30));
+
+ auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ auto copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable");
+
+ // borrow compaction only runs when we split, so nothing should be borrow compacted yet
+
+ {
+ for (auto shard: simpleInfo.Shards) {
+ CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ }
+ }
+
+ {
+ for (auto shard: copyInfo.Shards) {
+ CheckShardNotBorrowedCompacted(runtime, copyInfo.UserTable, shard, copyInfo.OwnerId);
+ }
+ }
+
+ // now force split
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 20
+ MaxPartitionsCount: 20
+ SizeToSplit: 1
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+
+ // schemeshard should get stats from DS to start borrower compactions
+ env.SimulateSleep(runtime, TDuration::Seconds(30));
+
+ // should compact all borrowed data (note that background will not compact until then)
+
+ {
+ for (auto shard: copyInfo.Shards) {
+ CheckShardBorrowedCompacted(runtime, copyInfo.UserTable, shard, copyInfo.OwnerId);
+ }
+ }
+
+ {
+ // Simple again the only owner
+ for (auto shard: simpleInfo.Shards) {
+ CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ }
+ }
+
+ // now should be no borrower compactions, but background should do the job
+
+ auto copyCount1 = GetCompactionStats(runtime, "/MyRoot/CopyTable").CompactBorrowedCount;
+ auto simpleCount1 = GetCompactionStats(runtime, "/MyRoot/Simple").CompactBorrowedCount;
+ env.SimulateSleep(runtime, TDuration::Seconds(30));
+
+ {
+ auto info = GetPathInfo(runtime, "/MyRoot/Simple");
+ for (auto shard: info.Shards) {
+ CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ }
+ auto simpleCount2 = GetCompactionStats(runtime, "/MyRoot/Simple").CompactBorrowedCount;
+ UNIT_ASSERT_VALUES_EQUAL(simpleCount1, simpleCount2);
+ }
+
+ {
+ auto info = GetPathInfo(runtime, "/MyRoot/CopyTable");
+ for (auto shard: info.Shards) {
+ CheckShardBackgroundCompacted(runtime, info.UserTable, shard, info.OwnerId);
+ }
+ auto copyCount2 = GetCompactionStats(runtime, "/MyRoot/CopyTable").CompactBorrowedCount;
+ UNIT_ASSERT_VALUES_EQUAL(copyCount1, copyCount2);
+ }
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldHandleBorrowCompactionTimeouts) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ auto configRequest = GetTestCompactionConfig();
+ auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+ compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(3);
+
+ SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest));
+
+ size_t borrowedRequests = 0;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCompactBorrowed: {
+ Y_UNUSED(ev.Release());
+ ++borrowedRequests;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ // copy table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Simple"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // now force split
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 2
+ MaxPartitionsCount: 2
+ SizeToSplit: 1
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+
+ // wait until DS reports that it has borrowed data
+ while (borrowedRequests < 1) {
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ }
+
+ env.SimulateSleep(runtime, TDuration::Seconds(60));
+
+ while (borrowedRequests < 3)
+ env.SimulateSleep(runtime, TDuration::Seconds(10));
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldHandleDataShardReboot) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ auto configRequest = GetTestCompactionConfig();
+ auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+ compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(1000); // avoid timeouts
+
+ // now we have 1 inflight which will hang the queue in case on long timeout
+ SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest));
+
+ size_t borrowedRequests = 0;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCompactBorrowed: {
+ Y_UNUSED(ev.Release());
+ ++borrowedRequests;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ // copy table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Simple"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // now force split
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 2
+ MaxPartitionsCount: 2
+ SizeToSplit: 1
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+
+ // wait until DS reports that it has borrowed data
+ while (borrowedRequests < 1) {
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ }
+
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 1UL);
+
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 1UL);
+
+ auto info = GetPathInfo(runtime, "/MyRoot/CopyTable");
+ UNIT_ASSERT_VALUES_EQUAL(info.Shards.size(), 1UL);
+
+ // break the pipes and check that SS requested compaction again
+ TActorId sender = runtime.AllocateEdgeActor();
+ RebootTablet(runtime, info.Shards[0], sender);
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 2UL);
+
+ // one more time
+ RebootTablet(runtime, info.Shards[0], sender);
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+ UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, 3UL);
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldNotCompactAfterDrop) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ auto configRequest = GetTestCompactionConfig();
+ auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+ compactionConfig->MutableBorrowedCompactionConfig()->SetTimeoutSeconds(5);
+
+ // now we have 1 inflight which will hang the queue in case on long timeout
+ SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest));
+
+ size_t borrowedRequests = 0;
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCompactBorrowed: {
+ Y_UNUSED(ev.Release());
+ ++borrowedRequests;
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ // copy table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Simple"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // now force split
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 2
+ MaxPartitionsCount: 2
+ SizeToSplit: 1
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+
+ // wait until DS reports that it has borrowed data
+ while (borrowedRequests < 1) {
+ env.SimulateSleep(runtime, TDuration::MilliSeconds(100));
+ }
+
+ auto requestsBefore = borrowedRequests;
+
+ // SS waits reply from DS, drop the table meanwhile
+ TestDropTable(runtime, ++txId, "/MyRoot", "CopyTable");
+ env.TestWaitNotification(runtime, txId);
+ env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets + 1);
+
+ env.SimulateSleep(runtime, TDuration::Seconds(10)); // 2x timeout
+ UNIT_ASSERT_VALUES_EQUAL(borrowedRequests, requestsBefore);
}
};
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 617a1a7f33..4cc983a6d9 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -60,6 +60,7 @@ SRCS(
defs.h
schemeshard.h
schemeshard.cpp
+ schemeshard__borrowed_compaction.cpp
schemeshard__compaction.cpp
schemeshard__clean_pathes.cpp
schemeshard__conditional_erase.cpp
diff --git a/ydb/core/util/operation_queue.h b/ydb/core/util/operation_queue.h
index 19c8750b49..61bed56bc3 100644
--- a/ydb/core/util/operation_queue.h
+++ b/ydb/core/util/operation_queue.h
@@ -4,7 +4,9 @@
#include "intrusive_heap.h"
#include "token_bucket.h"
-#include <library/cpp/time_provider/time_provider.h>
+#include <ydb/core/base/defs.h>
+
+#include <library/cpp/actors/core/monotonic.h>
#include <util/datetime/base.h>
#include <util/generic/algorithm.h>
@@ -26,9 +28,9 @@ enum class EStartStatus {
class ITimer {
public:
// asks to call TOperationQueue::Wakeup()
- virtual void SetWakeupTimer(TInstant t) = 0;
+ virtual void SetWakeupTimer(TDuration delta) = 0;
- virtual TInstant Now() = 0;
+ virtual TMonotonic Now() = 0;
};
template <typename T>
@@ -195,18 +197,18 @@ public:
struct TItemWithTs {
T Item;
- TInstant Timestamp;
+ TMonotonic Timestamp;
explicit TItemWithTs(const T& item)
: Item(item)
{ }
- TItemWithTs(const T& item, TInstant s)
+ TItemWithTs(const T& item, TMonotonic s)
: Item(item)
, Timestamp(s)
{ }
- TItemWithTs(T&& item, TInstant s)
+ TItemWithTs(T&& item, TMonotonic s)
: Item(std::move(item))
, Timestamp(s)
{ }
@@ -259,10 +261,10 @@ private:
TRunningItems RunningItems;
TWaitingItems WaitingItems;
- TTokenBucket TokenBucket;
+ TTokenBucketBase<TMonotonic> TokenBucket;
bool HasRateLimit = false;
- TInstant NextWakeup;
+ TMonotonic NextWakeup;
bool Running = false;
bool WasRunning = false;
@@ -527,8 +529,10 @@ TDuration TOperationQueue<T, TQueue>::OnDone(const T& item) {
template <typename T, typename TQueue>
void TOperationQueue<T, TQueue>::Wakeup() {
+ NextWakeup = {};
StartOperations();
- ScheduleWakeup();
+ if (!NextWakeup)
+ ScheduleWakeup();
}
template <typename T, typename TQueue>
@@ -551,10 +555,19 @@ void TOperationQueue<T, TQueue>::CheckTimeoutOperations() {
while (!RunningItems.Empty()) {
const auto& item = RunningItems.Front();
if (item.Timestamp + Config.Timeout <= now) {
- Starter.OnTimeout(item.Item);
- if (Config.IsCircular)
- ReEnqueueNoStart(std::move(item.Item));
+ auto movedItem = std::move(item.Item);
+
+ // we want to pop before calling OnTimeout, because
+ // it might want to enqueue in case of non-circular
+ // queue
RunningItems.PopFront();
+
+ // note that OnTimeout() can enqueue item back
+ // in case of non-circular queue
+ Starter.OnTimeout(movedItem);
+
+ if (Config.IsCircular)
+ ReEnqueueNoStart(std::move(movedItem));
continue;
}
break;
@@ -622,17 +635,17 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
if (TokenBucket.Available() <= 0) {
// we didn't start anything because of RPS limit
NextWakeup = now + TokenBucket.NextAvailableDelay();
- Timer.SetWakeupTimer(NextWakeup);
+ Timer.SetWakeupTimer(TokenBucket.NextAvailableDelay());
return;
- } else if (!NextWakeup || NextWakeup <= now) {
+ } else if (!NextWakeup) {
// special case when we failed to start anything
NextWakeup = now + Config.WakeupInterval;
- Timer.SetWakeupTimer(NextWakeup);
+ Timer.SetWakeupTimer(Config.WakeupInterval);
return;
}
}
- auto wakeup = TInstant::Max();
+ auto wakeup = TMonotonic::Max();
if (Config.Timeout && !RunningItems.Empty()) {
const auto& item = RunningItems.Front();
@@ -644,7 +657,7 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
wakeup = Min(wakeup, item.Timestamp + Config.MinOperationRepeatDelay);
}
- if (wakeup == TInstant::Max())
+ if (wakeup == TMonotonic::Max())
return;
// no sense to wakeup earlier that rate limit allows
@@ -652,9 +665,9 @@ void TOperationQueue<T, TQueue>::ScheduleWakeup() {
wakeup = Max(wakeup, now + TokenBucket.NextAvailableDelay());
}
- if (!NextWakeup || NextWakeup > wakeup || NextWakeup <= now) {
+ if (!NextWakeup || NextWakeup > wakeup) {
NextWakeup = wakeup;
- Timer.SetWakeupTimer(NextWakeup);
+ Timer.SetWakeupTimer(wakeup - now);
}
}
diff --git a/ydb/core/util/operation_queue_priority_ut.cpp b/ydb/core/util/operation_queue_priority_ut.cpp
index 32bd274fe6..687350cc3b 100644
--- a/ydb/core/util/operation_queue_priority_ut.cpp
+++ b/ydb/core/util/operation_queue_priority_ut.cpp
@@ -2,6 +2,7 @@
#include "circular_queue.h"
+#include <library/cpp/actors/core/monotonic_provider.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/ptr.h>
@@ -65,9 +66,9 @@ namespace {
TDuration Timeout = TDuration::Minutes(10);
-class TSimpleTimeProvider : public ITimeProvider {
+class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider {
public:
- TInstant Now() override {
+ TMonotonic Now() override {
return Now_;
}
@@ -75,12 +76,12 @@ public:
Now_ += delta;
}
- void Move(TInstant now) {
+ void Move(TMonotonic now) {
Now_ = now;
}
private:
- TInstant Now_;
+ TMonotonic Now_;
};
@@ -88,7 +89,7 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue
TSimpleTimeProvider TimeProvider;
TVector<TPriorityItem> StartHistory;
- TVector<TInstant> WakeupHistory;
+ TVector<TMonotonic> WakeupHistory;
NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning;
@@ -98,15 +99,15 @@ struct TOperationStarter : public TPriorityQueue::IStarter, public NOperationQue
return StartResult;
}
- void SetWakeupTimer(TInstant t) override
+ void SetWakeupTimer(TDuration delta) override
{
- WakeupHistory.push_back(t);
+ WakeupHistory.push_back(this->Now() + delta);
}
void OnTimeout(const TPriorityItem&) override
{}
- TInstant Now() override
+ TMonotonic Now() override
{
return TimeProvider.Now();
}
diff --git a/ydb/core/util/operation_queue_ut.cpp b/ydb/core/util/operation_queue_ut.cpp
index 1015cb217e..7daa22ba67 100644
--- a/ydb/core/util/operation_queue_ut.cpp
+++ b/ydb/core/util/operation_queue_ut.cpp
@@ -2,6 +2,7 @@
#include "circular_queue.h"
+#include <library/cpp/actors/core/monotonic_provider.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/ptr.h>
@@ -14,9 +15,9 @@ namespace {
TDuration Timeout = TDuration::Minutes(10);
-class TSimpleTimeProvider : public ITimeProvider {
+class TSimpleTimeProvider : public NActors::IMonotonicTimeProvider {
public:
- TInstant Now() override {
+ TMonotonic Now() override {
return Now_;
}
@@ -24,12 +25,12 @@ public:
Now_ += delta;
}
- void Move(TInstant now) {
+ void Move(TMonotonic now) {
Now_ = now;
}
private:
- TInstant Now_;
+ TMonotonic Now_;
};
using TQueue = TOperationQueue<int, TFifoQueue<int>>;
@@ -38,7 +39,7 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim
TSimpleTimeProvider TimeProvider;
TVector<int> StartHistory;
- TVector<TInstant> WakeupHistory;
+ TVector<TMonotonic> WakeupHistory;
NOperationQueue::EStartStatus StartResult = NOperationQueue::EStartStatus::EOperationRunning;
@@ -48,15 +49,15 @@ struct TOperationStarter : public TQueue::IStarter, public NOperationQueue::ITim
return StartResult;
}
- void SetWakeupTimer(TInstant t) override
+ void SetWakeupTimer(TDuration delta) override
{
- WakeupHistory.push_back(t);
+ WakeupHistory.push_back(this->Now() + delta);
}
void OnTimeout(const int&) override
{}
- TInstant Now() override
+ TMonotonic Now() override
{
return TimeProvider.Now();
}
@@ -68,7 +69,7 @@ void CheckQueue(
TVector<TQueue::TItemWithTs> runningGold,
TVector<int> inQueueGold,
TVector<int> startHistory,
- TVector<TInstant> wakeupHistory)
+ TVector<TMonotonic> wakeupHistory)
{
auto running = queue.GetRunning();
auto inQueue = queue.GetQueue();
@@ -107,7 +108,7 @@ void TestStartInflightBeforeStart(int inflight, int pushN = 10) {
runningGold.push_back({i, now});
}
- TVector<TInstant> wakeupsGold =
+ TVector<TMonotonic> wakeupsGold =
{ starter.TimeProvider.Now() + config.Timeout };
CheckQueue(
@@ -131,7 +132,7 @@ void TestInflightWithEnqueue(int inflight, int pushN = 10) {
TVector<TQueue::TItemWithTs> runningGold;
TVector<int> queuedGold;
- TVector<TInstant> wakeupsGold =
+ TVector<TMonotonic> wakeupsGold =
{ starter.TimeProvider.Now() + TDuration::Seconds(1) + config.Timeout };
int lastStarted = 0;
@@ -988,6 +989,36 @@ Y_UNIT_TEST_SUITE(TCircularOperationQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
UNIT_ASSERT(starter.WakeupHistory.back() > starter.TimeProvider.Now());
}
+
+ Y_UNIT_TEST(ShouldTolerateInaccurateTimer) {
+ // should properly work when wokeup earlier than requested (i.e. properly set new timer)
+ // regression test: woke up earlier and didn't set new wakeup
+
+ TQueue::TConfig config;
+ config.IsCircular = true;
+ config.InflightLimit = 1;
+ config.MaxRate = 0.0;
+ config.Timeout = Timeout;
+ TOperationStarter starter;
+
+ TQueue queue(config, starter, starter);
+ queue.Start();
+
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 0UL);
+
+ queue.Enqueue(1);
+
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 1UL);
+
+ // expect to wakeup on Timeout, but wakeup earlier
+ starter.TimeProvider.Move(Timeout - TDuration::Seconds(1));
+ queue.Wakeup();
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL);
+ UNIT_ASSERT_VALUES_EQUAL(queue.RunningSize(), 1UL);
+
+ UNIT_ASSERT_VALUES_EQUAL(starter.WakeupHistory.size(), 2UL);
+ }
};
} // NOperationQueue
diff --git a/ydb/core/util/token_bucket.h b/ydb/core/util/token_bucket.h
index 060f8c75a6..8ed8475e53 100644
--- a/ydb/core/util/token_bucket.h
+++ b/ydb/core/util/token_bucket.h
@@ -6,16 +6,17 @@
namespace NKikimr {
-class TTokenBucket {
+template<typename TTime>
+class TTokenBucketBase {
double Tokens = 0.0; // tokens currenty in bucket
double Rate = 0.0; // tokens filling rate [tokens/sec]
double Capacity = 0.0; // maximum amount of tokens allowed in bucket
- TInstant LastFill = TInstant::Zero();
+ TTime LastFill;
public:
// Create unlimited bucket
// NOTE: any bucket is created fully filled
- TTokenBucket() {
+ TTokenBucketBase() {
SetUnlimited();
}
@@ -46,7 +47,7 @@ public:
}
// Fill bucket with tokens, should be done just before Take()
- void Fill(TInstant now) {
+ void Fill(TTime now) {
// NOTE: LastFill is allowed to be zero, the following code will work OK
TDuration elapsed = now - LastFill;
Tokens += elapsed.SecondsFloat() * Rate;
@@ -62,7 +63,7 @@ public:
}
// Fill and take if available, returns taken amount
- double FillAndTryTake(TInstant now, double amount) {
+ double FillAndTryTake(TTime now, double amount) {
Fill(now);
amount = Min(amount, Tokens);
Take(amount);
@@ -78,7 +79,7 @@ public:
return TDuration::MicroSeconds(std::ceil(Available() * -1000000.0 / Rate));
}
- TDuration FillAndNextAvailableDelay(TInstant now) {
+ TDuration FillAndNextAvailableDelay(TTime now) {
Fill(now);
return NextAvailableDelay();
}
@@ -101,4 +102,6 @@ public:
}
};
+using TTokenBucket = TTokenBucketBase<TInstant>;
+
} // namespace NKikimr
diff --git a/ydb/core/util/ya.make b/ydb/core/util/ya.make
index 4f325d8fcb..91416afbc9 100644
--- a/ydb/core/util/ya.make
+++ b/ydb/core/util/ya.make
@@ -62,6 +62,7 @@ SRCS(
)
PEERDIR(
+ library/cpp/actors/core
library/cpp/actors/interconnect/mock
library/cpp/actors/util
library/cpp/containers/stack_vector