summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEvgeniy Ivanov <[email protected]>2022-03-31 22:04:30 +0300
committerEvgeniy Ivanov <[email protected]>2022-03-31 22:04:30 +0300
commitd2fa7a39fd858307d4d5a6e2d548c4a6ee4f496f (patch)
tree58c01c53ec453295133c94636a8808d77af63a81
parentfa4b82d3614d68d8c7ede2bf4f58ffa9f9487784 (diff)
KIKIMR-9748: don't background compact neither source nor dst copy tables
ref:43925608e7050d2b17a305e8335b75095ed42a22
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/protos/counters_schemeshard.proto2
-rw-r--r--ydb/core/protos/table_stats.proto3
-rw-r--r--ydb/core/protos/tx_datashard.proto4
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp6
-rw-r--r--ydb/core/tablet_flat/flat_executor.h1
-rw-r--r--ydb/core/tablet_flat/flat_executor_borrowlogic.cpp9
-rw-r--r--ydb/core/tablet_flat/flat_executor_borrowlogic.h3
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h1
-rw-r--r--ydb/core/tx/datashard/datashard__compaction.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard__stats.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h1
-rw-r--r--ydb/core/tx/datashard/datashard_user_table.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_background_compaction.cpp50
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp25
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp27
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h23
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp80
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h5
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp284
25 files changed, 461 insertions, 119 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index db413eaceab..26e35e98c5b 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -109,6 +109,7 @@ enum ECumulativeCounters {
COUNTER_TX_BACKGROUND_COMPACTION_FAILED_BORROWED = 83 [(CounterOpts) = {Name: "TxCompactTableFailedBorrowed"}];
COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START = 84 [(CounterOpts) = {Name: "TxCompactTableFailedStart"}];
COUNTER_FULL_COMPACTION_DONE = 85 [(CounterOpts) = {Name: "FullCompactionCount"}];
+ COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED = 86 [(CounterOpts) = {Name: "TxCompactTableFailedLoaned"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 70378fa470e..7484b511b88 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -154,6 +154,7 @@ enum ESimpleCounters {
COUNTER_COMPACTION_QUEUE_RUNNING = 125 [(CounterOpts) = {Name: "BackgroundCompactionQueueRunning"}];
COUNTER_COMPACTION_QUEUE_WAITING_REPEAT = 126 [(CounterOpts) = {Name: "BackgroundCompactionQueueWaitingRepeat"}];
COUNTER_SHARDS_WITH_BORROWED_DATA = 127 [(CounterOpts) = {Name: "TableShardsWithBorrowedData"}];
+ COUNTER_SHARDS_WITH_LOANED_DATA = 128 [(CounterOpts) = {Name: "TableShardsWithLoanedData"}];
}
enum ECumulativeCounters {
@@ -244,6 +245,7 @@ enum ECumulativeCounters {
COUNTER_BACKGROUND_COMPACTION_BORROWED = 73 [(CounterOpts) = {Name: "BackgroundCompactionBorrowed"}];
COUNTER_BACKGROUND_COMPACTION_NOT_NEEDED = 74 [(CounterOpts) = {Name: "BackgroundCompactionNotNeeded"}];
+ COUNTER_BACKGROUND_COMPACTION_LOANED = 75 [(CounterOpts) = {Name: "BackgroundCompactionLoaned"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/table_stats.proto b/ydb/core/protos/table_stats.proto
index a555e0b659d..a2a22e8ff54 100644
--- a/ydb/core/protos/table_stats.proto
+++ b/ydb/core/protos/table_stats.proto
@@ -43,4 +43,7 @@ message TTableStats {
// seconds since epoch
optional uint64 LastFullCompactionTs = 28;
+
+ // i.e. this shard lent to other shards
+ optional bool HasLoanedParts = 29;
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 68c92ea5aad..e197ecf4662 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1400,7 +1400,7 @@ message TEvKqpScan {
message TEvCompactTable {
optional NKikimrProto.TPathID PathId = 1;
- optional bool CompactBorrowed = 2;
+ optional bool CompactBorrowed = 2; // i.e. taken
optional bool CompactSinglePartedShards = 3;
}
@@ -1410,6 +1410,7 @@ message TEvCompactTableResult {
NOT_NEEDED = 1;
FAILED = 2;
BORROWED = 3;
+ LOANED = 4;
};
optional uint64 TabletId = 1;
@@ -1427,6 +1428,7 @@ message TEvGetCompactTableStats {
message TEvGetCompactTableStatsResult {
optional uint64 BackgroundCompactionRequests = 1;
+ optional uint64 BackgroundCompactionCount = 2;
}
// TEvRead is used to request multiple queries from a shard and
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 7f8c5550ad4..91f3a137bc1 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -3798,6 +3798,12 @@ THashMap<TLogoBlobID, TVector<ui64>> TExecutor::GetBorrowedParts() const {
return { };
}
+bool TExecutor::HasLoanedParts() const {
+ if (BorrowLogic)
+ return BorrowLogic->HasLoanedParts();
+ return false;
+}
+
const TExecutorStats& TExecutor::GetStats() const {
return *Stats;
}
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index 3a61e5385dd..9e72b436cfc 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -645,6 +645,7 @@ public:
void SendUserAuxUpdateToFollowers(TString upd, const TActorContext &ctx) override;
THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const override;
+ bool HasLoanedParts() const override;
const TExecutorStats& GetStats() const override;
NMetrics::TResourceMetrics* GetResourceMetrics() const override;
diff --git a/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp b/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp
index bd52264b10e..b0371b2701c 100644
--- a/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp
+++ b/ydb/core/tablet_flat/flat_executor_borrowlogic.cpp
@@ -593,4 +593,13 @@ THashMap<TLogoBlobID, TVector<ui64>> TExecutorBorrowLogic::GetBorrowedParts() co
return result;
}
+bool TExecutorBorrowLogic::HasLoanedParts() const {
+ for (const auto &xpair : BorrowedInfo) {
+ if (xpair.second.BorrowInfo.FullBorrow) {
+ return true;
+ }
+ }
+ return false;
+}
+
}}
diff --git a/ydb/core/tablet_flat/flat_executor_borrowlogic.h b/ydb/core/tablet_flat/flat_executor_borrowlogic.h
index 5d990b49dc0..31021454854 100644
--- a/ydb/core/tablet_flat/flat_executor_borrowlogic.h
+++ b/ydb/core/tablet_flat/flat_executor_borrowlogic.h
@@ -168,6 +168,9 @@ public:
// for cleanup
THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const;
+
+ // i.e. parts we own, but loaned to others
+ bool HasLoanedParts() const;
};
}}
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index babe00d8635..f628f6cd42b 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -540,6 +540,7 @@ namespace NFlatExecutorSetup {
// Returns parts owned by this tablet and borrowed by other tablets
virtual THashMap<TLogoBlobID, TVector<ui64>> GetBorrowedParts() const = 0;
+ virtual bool HasLoanedParts() const = 0;
// This method lets executor know about new yellow channels
virtual void OnYellowChannels(TVector<ui32> yellowMoveChannels, TVector<ui32> yellowStopChannels) = 0;
diff --git a/ydb/core/tx/datashard/datashard__compaction.cpp b/ydb/core/tx/datashard/datashard__compaction.cpp
index 8d4c2b84eb8..317767d7368 100644
--- a/ydb/core/tx/datashard/datashard__compaction.cpp
+++ b/ydb/core/tx/datashard/datashard__compaction.cpp
@@ -66,7 +66,6 @@ public:
const TUserTable& tableInfo = *it->second;
const auto localTid = tableInfo.LocalTid;
- // TODO: consider using metrics instead
++tableInfo.Stats.BackgroundCompactionRequests;
auto stats = txc.DB.GetCompactionStats(localTid);
@@ -82,11 +81,12 @@ public:
if (hasBorrowed && !record.GetCompactBorrowed()) {
// normally we should not receive requests to compact in this case
+ // but in some rare cases like schemeshard restart we can
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Background compaction of tablet# " << Self->TabletID()
<< " of path# " << pathId
<< ", requested from# " << Ev->Sender
- << " contains borrowed data, failed");
+ << " contains borrowed parts, failed");
Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION_FAILED_BORROWED);
@@ -98,6 +98,25 @@ public:
return true;
}
+ if (Self->Executor()->HasLoanedParts()) {
+ // normally we should not receive requests to compact in this case
+ // but in some rare cases like schemeshard restart we can
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Background compaction of tablet# " << Self->TabletID()
+ << " of path# " << pathId
+ << ", requested from# " << Ev->Sender
+ << " contains loaned parts, failed");
+
+ Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION_FAILED_LOANED);
+
+ auto response = MakeHolder<TEvDataShard::TEvCompactTableResult>(
+ Self->TabletID(),
+ pathId,
+ NKikimrTxDataShard::TEvCompactTableResult::LOANED);
+ ctx.Send(Ev->Sender, std::move(response));
+ return true;
+ }
+
bool isEmpty = stats.PartCount == 0 && stats.MemDataSize == 0;
bool isSingleParted = stats.PartCount == 1 && stats.MemDataSize == 0;
if (isEmpty || isSingleParted && !hasBorrowed && !record.HasCompactSinglePartedShards()) {
@@ -133,6 +152,7 @@ public:
Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION);
Self->CompactionWaiters[tableInfo.LocalTid].emplace_back(std::make_tuple(compactionId, pathId, Ev->Sender));
+ ++tableInfo.Stats.BackgroundCompactionCount;
} else {
// compaction failed, for now we don't care
Self->IncCounter(COUNTER_TX_BACKGROUND_COMPACTION_FAILED_START);
@@ -253,6 +273,7 @@ void TDataShard::Handle(TEvDataShard::TEvGetCompactTableStats::TPtr& ev, const T
if (it != TableInfos.end()) {
const TUserTable& tableInfo = *it->second;
response->Record.SetBackgroundCompactionRequests(tableInfo.Stats.BackgroundCompactionRequests);
+ response->Record.SetBackgroundCompactionCount(tableInfo.Stats.BackgroundCompactionCount);
}
ctx.Send(ev->Sender, std::move(response));
diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp
index 3ee32261af4..c618255c7f0 100644
--- a/ydb/core/tx/datashard/datashard__stats.cpp
+++ b/ydb/core/tx/datashard/datashard__stats.cpp
@@ -126,6 +126,7 @@ public:
Result->Record.MutableTableStats()->SetPartCount(tableInfo.Stats.PartCount);
Result->Record.MutableTableStats()->SetSearchHeight(tableInfo.Stats.SearchHeight);
Result->Record.MutableTableStats()->SetLastFullCompactionTs(tableInfo.Stats.LastFullCompaction.Seconds());
+ Result->Record.MutableTableStats()->SetHasLoanedParts(Self->Executor()->HasLoanedParts());
Result->Record.SetShardState(Self->State);
for (const auto& pi : tableInfo.Stats.PartOwners) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 9184a788727..4373e74354f 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -2500,6 +2500,7 @@ protected:
ev->Record.MutableTableStats()->SetPartCount(ti.Stats.PartCount);
ev->Record.MutableTableStats()->SetSearchHeight(ti.Stats.SearchHeight);
ev->Record.MutableTableStats()->SetLastFullCompactionTs(ti.Stats.LastFullCompaction.Seconds());
+ ev->Record.MutableTableStats()->SetHasLoanedParts(Executor()->HasLoanedParts());
if (!ti.Stats.PartOwners.contains(TabletID())) {
ev->Record.AddUserTablePartOwners(TabletID());
diff --git a/ydb/core/tx/datashard/datashard_user_table.h b/ydb/core/tx/datashard/datashard_user_table.h
index 4cc7845d8b4..2324be8b5d9 100644
--- a/ydb/core/tx/datashard/datashard_user_table.h
+++ b/ydb/core/tx/datashard/datashard_user_table.h
@@ -318,6 +318,7 @@ struct TUserTable : public TThrRefBase {
ui64 DataSizeResolution = 0;
ui64 RowCountResolution = 0;
ui64 BackgroundCompactionRequests = 0;
+ ui64 BackgroundCompactionCount = 0;
NTable::TKeyAccessSample AccessStats;
void Update(NTable::TStats&& dataStats, ui64 indexSize, THashSet<ui64>&& partOwners, ui64 partCount, TInstant statsUpdateTime) {
diff --git a/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp b/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp
index 758508bd29e..22bcd5977a8 100644
--- a/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_background_compaction.cpp
@@ -70,22 +70,68 @@ Y_UNIT_TEST_SUITE(DataShardBackgroundCompaction) {
ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 100), (3, 300), (5, 500);");
auto shards = GetTableShards(server, sender, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1UL);
+ const auto originalShard = shards.at(0);
SetSplitMergePartCountLimit(&runtime, -1);
ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards.at(0), 3);
WaitTxNotification(server, sender, txId);
- auto [tables, ownerId] = GetTables(server, shards.at(0));
-
shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT(shards.size() > 1);
for (auto shard: shards) {
+ auto [tables, ownerId] = GetTables(server, shard);
auto compactionResult = CompactTable(server, tables["table-1"], shard, ownerId);
+ UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::BORROWED);
+ }
+
+ {
+ auto [tables, ownerId] = GetTables(server, originalShard);
+ // try to compact original table (should be inactive now)
+ auto compactionResult = CompactTable(server, tables["table-1"], originalShard, ownerId);
UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::FAILED);
}
}
+ Y_UNIT_TEST(ShouldNotCompactWhenCopyTable) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ ExecSQL(server, sender, "UPSERT INTO [/Root/table-1] (key, value) VALUES (1, 100), (3, 300), (5, 500);");
+
+ auto txIdCopy = AsyncCreateCopyTable(server, sender, "/Root", "table-2", "/Root/table-1");
+ WaitTxNotification(server, sender, txIdCopy);
+
+ {
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+
+ auto [tables, ownerId] = GetTables(server, shards.at(0));
+ auto compactionResult = CompactTable(server, tables["table-1"], shards.at(0), ownerId);
+ UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::LOANED);
+ }
+
+ {
+ auto shards = GetTableShards(server, sender, "/Root/table-2");
+
+ auto [tables, ownerId] = GetTables(server, shards.at(0));
+ auto compactionResult = CompactTable(server, tables["table-2"], shards.at(0), ownerId);
+ UNIT_ASSERT_VALUES_EQUAL(compactionResult.GetStatus(), NKikimrTxDataShard::TEvCompactTableResult::BORROWED);
+ }
+ }
+
Y_UNIT_TEST(ShouldNotCompactEmptyTable) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index 3ed441d3963..820d7f2d2c1 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1203,6 +1203,31 @@ void CreateShardedTable(Tests::TServer::TPtr server,
CreateShardedTable(server, sender, root, name, opts);
}
+ui64 AsyncCreateCopyTable(
+ Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &root,
+ const TString &name,
+ const TString &from)
+{
+ auto &runtime = *server->GetRuntime();
+
+ // Create table with four shards.
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
+ auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
+ tx.SetWorkingDir(root);
+ auto &desc = *tx.MutableCreateTable();
+ desc.SetName(name);
+ desc.SetCopyFromTable(from);
+
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
+ return ev->Get()->Record.GetTxId();
+}
+
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(Tests::TServer::TPtr server,
TActorId sender,
const TString &path)
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 79794005400..7f903afe5f6 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -469,6 +469,12 @@ void CreateShardedTable(Tests::TServer::TPtr server,
const NLocalDb::TCompactionPolicy* policy = nullptr,
EShadowDataMode shadowData = EShadowDataMode::Default);
+ui64 AsyncCreateCopyTable(Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &root,
+ const TString &name,
+ const TString &from);
+
TVector<ui64> GetTableShards(Tests::TServer::TPtr server,
TActorId sender,
const TString &path);
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index e7ca6bd002f..3a568664ae4 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -2519,33 +2519,6 @@ Y_UNIT_TEST_QUAD(TestOutOfOrderNoBarrierRestartImmediateLongTail, UseMvcc, UseNe
}
}
-namespace {
- ui64 AsyncCreateCopyTable(
- Tests::TServer::TPtr server,
- TActorId sender,
- const TString &root,
- const TString &name,
- const TString &from)
- {
- auto &runtime = *server->GetRuntime();
-
- // Create table with four shards.
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
- tx.SetWorkingDir(root);
- auto &desc = *tx.MutableCreateTable();
- desc.SetName(name);
- desc.SetCopyFromTable(from);
-
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
- }
-}
-
Y_UNIT_TEST_QUAD(TestCopyTableNoDeadlock, UseMvcc, UseNewEngine) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index 1ba33fc7d47..53be2907a8b 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -7,6 +7,8 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/util/operation_queue.h>
+#include <library/cpp/actors/core/log.h>
+
// TODO: TOperationQueueWithTimer is a good candidate for core/util, but since
// it uses actorlib_impl, which depends on core/util, it
// can't be part of util. No other better place yet and since
@@ -17,14 +19,14 @@ namespace NKikimr {
// TODO: make part of util?
namespace NOperationQueue {
-template <typename T, typename TQueue, int Ev>
+template <typename T, typename TQueue, int Ev, int LogServiceId>
class TOperationQueueWithTimer
- : public TActor<TOperationQueueWithTimer<T, TQueue, Ev>>
+ : public TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>>
, public ITimer
, public TOperationQueue<T, TQueue>
{
- using TThis = ::NKikimr::NOperationQueue::TOperationQueueWithTimer<T, TQueue, Ev>;
- using TActorBase = TActor<TOperationQueueWithTimer<T, TQueue, Ev>>;
+ using TThis = ::NKikimr::NOperationQueue::TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>;
+ using TActorBase = TActor<TOperationQueueWithTimer<T, TQueue, Ev, LogServiceId>>;
using TBase = TOperationQueue<T, TQueue>;
struct TEvWakeupQueue : public TEventLocal<TEvWakeupQueue, Ev> {
@@ -32,6 +34,7 @@ class TOperationQueueWithTimer
};
private:
+ NKikimrServices::EServiceKikimr ServiceId = NKikimrServices::EServiceKikimr(LogServiceId);
TActorId LongTimerId;
TInstant When;
@@ -67,22 +70,28 @@ private:
this->Send(LongTimerId, new TEvents::TEvPoison);
When = t;
- LongTimerId = CreateLongTimer(t - Now(),
+ auto delta = t - this->Now();
+ auto ctx = TActivationContext::ActorContextFor(TActorBase::SelfId());
+ LongTimerId = CreateLongTimer(ctx, delta,
new IEventHandle(TActorBase::SelfId(), TActorBase::SelfId(), new TEvWakeupQueue));
+
+ LOG_DEBUG_S(ctx, ServiceId,
+ "Operation queue set NextWakeup# " << When << ", delta# " << delta.Seconds() << " seconds");
}
TInstant Now() override {
return AppData()->TimeProvider->Now();
}
- void HandleWakeup() {
+ void HandleWakeup(const TActorContext &ctx) {
+ LOG_DEBUG_S(ctx, ServiceId, "Operation queue wakeup# " << this->Now());
TBase::Wakeup();
}
STFUNC(StateWork) {
Y_UNUSED(ctx);
switch (ev->GetTypeRewrite()) {
- cFunc(TEvWakeupQueue::EventType, HandleWakeup);
+ CFunc(TEvWakeupQueue::EventType, HandleWakeup);
}
}
};
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index 4d937c2ad44..702ace05dc7 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -3,15 +3,16 @@
namespace NKikimr {
namespace NSchemeShard {
-NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardIdx& shardId) {
+NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TShardCompactionInfo& info) {
UpdateBackgroundCompactionQueueMetrics();
auto ctx = TActivationContext::ActorContextFor(SelfId());
- auto it = ShardInfos.find(shardId);
+ const auto& shardIdx = info.ShardIdx;
+ auto it = ShardInfos.find(shardIdx);
if (it == ShardInfos.end()) {
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info "
- "for background compaction# " << shardId
+ "for background compaction# " << shardIdx
<< " at schemeshard# " << TabletID());
return NOperationQueue::EStartStatus::EOperationRemove;
@@ -22,6 +23,7 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCompaction "
"for pathId# " << pathId << ", datashard# " << datashardId
+ << ", compactionInfo# " << info
<< ", next wakeup# " << CompactionQueue->GetWakeupTime()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
@@ -42,17 +44,17 @@ NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCompaction(const TSha
return NOperationQueue::EStartStatus::EOperationRunning;
}
-void TSchemeShard::OnBackgroundCompactionTimeout(const TShardIdx& shardId) {
+void TSchemeShard::OnBackgroundCompactionTimeout(const TShardCompactionInfo& info) {
UpdateBackgroundCompactionQueueMetrics();
-
TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_TIMEOUT].Increment(1);
auto ctx = TActivationContext::ActorContextFor(SelfId());
- auto it = ShardInfos.find(shardId);
+ const auto& shardIdx = info.ShardIdx;
+ auto it = ShardInfos.find(shardIdx);
if (it == ShardInfos.end()) {
LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unable to resolve shard info "
- "for timeout background compaction# " << shardId
+ "for timeout background compaction# " << shardIdx
<< " at schemeshard# " << TabletID());
return;
}
@@ -62,6 +64,7 @@ void TSchemeShard::OnBackgroundCompactionTimeout(const TShardIdx& shardId) {
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background compaction timeout "
"for pathId# " << pathId << ", datashard# " << datashardId
+ << ", compactionInfo# " << info
<< ", next wakeup# " << CompactionQueue->GetWakeupTime()
<< ", rate# " << CompactionQueue->GetRate()
<< ", in queue# " << CompactionQueue->Size() << " shards"
@@ -100,6 +103,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
} else {
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Finished background compaction "
"for pathId# " << pathId << ", datashard# " << tabletId
+ << ", shardIdx# " << shardIdx
<< " in# " << duration.MilliSeconds() << " ms, with status# " << (int)record.GetStatus()
<< ", next wakeup# " << CompactionQueue->GetWakeupTime()
<< ", rate# " << CompactionQueue->GetRate()
@@ -126,6 +130,9 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
case NKikimrTxDataShard::TEvCompactTableResult::BORROWED:
TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_BORROWED].Increment(1);
break;
+ case NKikimrTxDataShard::TEvCompactTableResult::LOANED:
+ TabletCounters->Cumulative()[COUNTER_BACKGROUND_COMPACTION_LOANED].Increment(1);
+ break;
}
UpdateBackgroundCompactionQueueMetrics();
@@ -138,8 +145,30 @@ void TSchemeShard::EnqueueCompaction(
if (!CompactionQueue)
return;
- if (stats.HasBorrowed)
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+
+ if (stats.HasBorrowedData) {
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "background compaction enqueue skipped shard# " << shardIdx
+ << " with borrowed parts at schemeshard " << TabletID());
return;
+ }
+
+ if (stats.HasLoanedData) {
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "background compaction enqueue skipped shard# " << shardIdx
+ << " with loaned parts at schemeshard " << TabletID());
+ return;
+ }
+
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "background compaction enqueue shard# " << shardIdx
+ << " with partCount# " << stats.PartCount
+ << ", rowCount# " << stats.RowCount
+ << ", searchHeight# " << stats.SearchHeight
+ << ", memDataSize# " << stats.MemDataSize
+ << ", lastFullCompaction# " << TInstant::Seconds(stats.FullCompactionTs)
+ << " at schemeshard " << TabletID());
CompactionQueue->Enqueue(TShardCompactionInfo(shardIdx, stats));
UpdateBackgroundCompactionQueueMetrics();
@@ -152,11 +181,33 @@ void TSchemeShard::UpdateCompaction(
if (!CompactionQueue)
return;
- if (newStats.HasBorrowed) {
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+
+ if (newStats.HasBorrowedData) {
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "background compaction update removed shard# " << shardIdx
+ << " with borrowed parts at schemeshard " << TabletID());
+ RemoveCompaction(shardIdx);
+ return;
+ }
+
+ if (newStats.HasLoanedData) {
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "background compaction update removed shard# " << shardIdx
+ << " with loaned parts at schemeshard " << TabletID());
RemoveCompaction(shardIdx);
return;
}
+ LOG_TRACE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "background compaction enqueue/update shard# " << shardIdx
+ << " with partCount# " << newStats.PartCount
+ << ", rowCount# " << newStats.RowCount
+ << ", searchHeight# " << newStats.SearchHeight
+ << ", memDataSize# " << newStats.MemDataSize
+ << ", lastFullCompaction# " << TInstant::Seconds(newStats.FullCompactionTs)
+ << " at schemeshard " << TabletID());
+
TShardCompactionInfo info(shardIdx, newStats);
if (!CompactionQueue->Update(info))
CompactionQueue->Enqueue(std::move(info));
@@ -191,12 +242,18 @@ void TSchemeShard::UpdateShardMetrics(
const TShardIdx& shardIdx,
const TTableInfo::TPartitionStats& newStats)
{
- if (newStats.HasBorrowed)
+ if (newStats.HasBorrowedData)
ShardsWithBorrowed.insert(shardIdx);
else
ShardsWithBorrowed.erase(shardIdx);
TabletCounters->Simple()[COUNTER_SHARDS_WITH_BORROWED_DATA].Set(ShardsWithBorrowed.size());
+ if (newStats.HasLoanedData)
+ ShardsWithLoaned.insert(shardIdx);
+ else
+ ShardsWithLoaned.erase(shardIdx);
+ TabletCounters->Simple()[COUNTER_SHARDS_WITH_LOANED_DATA].Set(ShardsWithLoaned.size());
+
THashMap<TShardIdx, TPartitionMetrics>::insert_ctx insertCtx;
auto it = PartitionMetricsMap.find(shardIdx, insertCtx);
if (it != PartitionMetricsMap.end()) {
@@ -230,6 +287,9 @@ void TSchemeShard::RemoveShardMetrics(const TShardIdx& shardIdx) {
ShardsWithBorrowed.erase(shardIdx);
TabletCounters->Simple()[COUNTER_SHARDS_WITH_BORROWED_DATA].Set(ShardsWithBorrowed.size());
+ ShardsWithLoaned.erase(shardIdx);
+ TabletCounters->Simple()[COUNTER_SHARDS_WITH_LOANED_DATA].Set(ShardsWithLoaned.size());
+
auto it = PartitionMetricsMap.find(shardIdx);
if (it == PartitionMetricsMap.end())
return;
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index 28905d5b3d4..4ade8158a3a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -178,10 +178,11 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
newStats.FullCompactionTs = tableStats.GetLastFullCompactionTs();
newStats.MemDataSize = tableStats.GetInMemSize();
newStats.StartTime = TInstant::MilliSeconds(rec.GetStartTime());
+ newStats.HasLoanedData = tableStats.GetHasLoanedParts();
for (ui64 tabletId : rec.GetUserTablePartOwners()) {
newStats.PartOwners.insert(TTabletId(tabletId));
if (tabletId != rec.GetDatashardId()) {
- newStats.HasBorrowed = true;
+ newStats.HasBorrowedData = true;
}
}
for (ui64 tabletId : rec.GetSysTablesPartOwners()) {
@@ -328,7 +329,7 @@ bool TTxStorePartitionStats::Execute(TTransactionContext& txc, const TActorConte
}
}
- if (newStats.HasBorrowed) {
+ if (newStats.HasBorrowedData) {
// We don't want to split shards that have borrow parts
// We must ask them to compact first
CompactEv.Reset(new TEvDataShard::TEvCompactBorrowed(tableId));
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index f0df5fa4542..8059d196d7f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -290,6 +290,7 @@ void TSchemeShard::Clear() {
UpdateBackgroundCompactionQueueMetrics();
}
ShardsWithBorrowed.clear();
+ ShardsWithLoaned.clear();
PersQueueGroups.clear();
RtmrVolumes.clear();
SubDomains.clear();
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 07f58ad7d02..aebd6694934 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -78,7 +78,8 @@ private:
using TCompactionQueue = NOperationQueue::TOperationQueueWithTimer<
TShardCompactionInfo,
TCompactionQueueImpl,
- TEvPrivate::EvRunBackgroundCompaction>;
+ TEvPrivate::EvRunBackgroundCompaction,
+ NKikimrServices::FLAT_TX_SCHEMESHARD>;
class TCompactionStarter : public TCompactionQueue::IStarter {
public:
@@ -87,11 +88,11 @@ private:
{ }
NOperationQueue::EStartStatus StartOperation(const TShardCompactionInfo& info) override {
- return Self->StartBackgroundCompaction(info.ShardIdx);
+ return Self->StartBackgroundCompaction(info);
}
void OnTimeout(const TShardCompactionInfo& info) override {
- Self->OnBackgroundCompactionTimeout(info.ShardIdx);
+ Self->OnBackgroundCompactionTimeout(info);
}
private:
@@ -205,7 +206,8 @@ public:
TCompactionStarter CompactionStarter;
TCompactionQueue* CompactionQueue = nullptr;
- THashSet<TShardIdx> ShardsWithBorrowed;
+ THashSet<TShardIdx> ShardsWithBorrowed; // shards have parts from another shards
+ THashSet<TShardIdx> ShardsWithLoaned; // shards have parts loaned to another shards
bool EnableBackgroundCompaction = false;
bool EnableBackgroundCompactionServerless = false;
@@ -645,8 +647,8 @@ public:
void ShardRemoved(const TShardIdx& shardIdx);
- NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardIdx& shardId);
- void OnBackgroundCompactionTimeout(const TShardIdx& shardId);
+ NOperationQueue::EStartStatus StartBackgroundCompaction(const TShardCompactionInfo& info);
+ void OnBackgroundCompactionTimeout(const TShardCompactionInfo& info);
void UpdateBackgroundCompactionQueueMetrics();
struct TTxCleanDroppedSubDomains;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 332746dae08..67c209da1e1 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1441,7 +1441,7 @@ bool TTableInfo::TryAddShardToMerge(const TSplitSettings& splitSettings,
}
// We don't want to merge shards that have borrowed non-compacted data
- if (stats->HasBorrowed)
+ if (stats->HasBorrowedData)
return false;
bool canMerge = false;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 73dced02199..9175c1b86e3 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -247,7 +247,10 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
ui32 ShardState = NKikimrTxDataShard::Unknown;
// True when PartOwners has parts from other tablets
- bool HasBorrowed = false;
+ bool HasBorrowedData = false;
+
+ // True when lent parts to other tablets
+ bool HasLoanedData = false;
// Tablet actor started at
TInstant StartTime;
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index bcc38c2d8ac..1b05ad8d66a 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -46,6 +46,69 @@ std::pair<TTableInfoMap, ui64> GetTables(
return std::make_pair(result, ownerId);
}
+struct TPathInfo {
+ ui64 OwnerId = TTestTxConfig::SchemeShard;
+ NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;
+ TVector<ui64> Shards;
+};
+
+TPathInfo GetPathInfo(
+ TTestActorRuntime &runtime,
+ const char* fullPath,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
+{
+ TPathInfo info;
+ auto description = DescribePrivatePath(runtime, schemeshardId, fullPath, true, true);
+ for (auto &part : description.GetPathDescription().GetTablePartitions())
+ info.Shards.push_back(part.GetDatashardId());
+
+ auto [tables, ownerId] = GetTables(runtime, info.Shards.at(0));
+ auto userTableName = TStringBuf(fullPath).RNextTok('/');
+ info.UserTable = tables[userTableName];
+ info.OwnerId = ownerId;
+
+ return info;
+}
+
+void CreateTableWithData(
+ TTestActorRuntime &runtime,
+ TTestEnv& env,
+ const char* path,
+ const char* name,
+ ui32 shardsCount,
+ ui64& txId,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
+{
+ TestCreateTable(runtime, schemeshardId, ++txId, path,
+ Sprintf(R"____(
+ Name: "%s"
+ Columns { Name: "key" Type: "Uint64"}
+ Columns { Name: "value" Type: "Utf8"}
+ KeyColumnNames: ["key"]
+ UniformPartitionsCount: %d
+ )____", name, shardsCount));
+ env.TestWaitNotification(runtime, txId, schemeshardId);
+
+ auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint64 '%lu)) ) )
+ (let value '('('value (Utf8 'MostMeaninglessValueInTheWorld)) ) )
+ (return (AsList (UpdateRow '__user__%s key value) ))
+ )
+ )", key, tableName);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+
+ for (ui64 key = 0; key < 100; ++key) {
+ fnWriteRow(TTestTxConfig::FakeHiveTablets, key, name);
+ }
+}
+
void SetFeatures(
TTestActorRuntime &runtime,
TTestEnv&,
@@ -61,6 +124,11 @@ void SetFeatures(
compactionConfig->MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig->MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ // 1 compaction / second
+ compactionConfig->MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
+ compactionConfig->MutableBackgroundCompactionConfig()->SetMaxRate(1);
+ compactionConfig->MutableBackgroundCompactionConfig()->SetRoundSeconds(0);
+
auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(schemeShard, sender, request.Release(), 0, GetPipeConfigWithRetries());
@@ -96,6 +164,11 @@ void DisableBackgroundCompactionViaRestart(
compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ // 1 compaction / second
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0);
+
TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, schemeShard, sender);
}
@@ -116,11 +189,33 @@ void EnableBackgroundCompactionViaRestart(
compactionConfig.MutableBackgroundCompactionConfig()->SetRowCountThreshold(0);
compactionConfig.MutableBackgroundCompactionConfig()->SetCompactSinglePartedShards(true);
+ // 1 compaction / second
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMinCompactionRepeatDelaySeconds(0);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetMaxRate(1);
+ compactionConfig.MutableBackgroundCompactionConfig()->SetRoundSeconds(0);
+
TActorId sender = runtime.AllocateEdgeActor();
RebootTablet(runtime, schemeShard, sender);
}
-ui64 GetCompactionsCount(
+struct TCompactionStats {
+ ui64 BackgroundRequestCount = 0;
+ ui64 BackgroundCompactionCount = 0;
+
+ TCompactionStats() = default;
+
+ TCompactionStats(const NKikimrTxDataShard::TEvGetCompactTableStatsResult& stats)
+ : BackgroundRequestCount(stats.GetBackgroundCompactionRequests())
+ , BackgroundCompactionCount(stats.GetBackgroundCompactionCount())
+ {}
+
+ void Update(const TCompactionStats& other) {
+ BackgroundRequestCount += other.BackgroundRequestCount;
+ BackgroundCompactionCount += other.BackgroundCompactionCount;
+ }
+};
+
+TCompactionStats GetCompactionStats(
TTestActorRuntime &runtime,
const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,
ui64 tabletId,
@@ -135,25 +230,41 @@ ui64 GetCompactionsCount(
auto response = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetCompactTableStatsResult>(handle);
UNIT_ASSERT(response->Record.HasBackgroundCompactionRequests());
- return response->Record.GetBackgroundCompactionRequests();
+ return TCompactionStats(response->Record);
}
-ui64 GetCompactionsCount(
+TCompactionStats GetCompactionStats(
TTestActorRuntime &runtime,
const NKikimrTxDataShard::TEvGetInfoResponse::TUserTable& userTable,
const TVector<ui64>& shards,
ui64 ownerId)
{
- ui64 compactionsCount = 0;
+ TCompactionStats stats;
+
for (auto shard: shards) {
- compactionsCount += GetCompactionsCount(
+ stats.Update(GetCompactionStats(
runtime,
userTable,
shard,
- ownerId);
+ ownerId));
}
- return compactionsCount;
+ return stats;
+}
+
+TCompactionStats GetCompactionStats(
+ TTestActorRuntime &runtime,
+ const TString& path,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
+{
+ auto info = GetPathInfo(runtime, path.c_str(), schemeshardId);
+ UNIT_ASSERT(!info.Shards.empty());
+
+ return GetCompactionStats(
+ runtime,
+ info.UserTable,
+ info.Shards,
+ info.OwnerId);
}
void CheckShardCompacted(
@@ -163,11 +274,11 @@ void CheckShardCompacted(
ui64 ownerId,
bool shouldCompacted = true)
{
- auto count = GetCompactionsCount(
+ auto count = GetCompactionStats(
runtime,
userTable,
tabletId,
- ownerId);
+ ownerId).BackgroundRequestCount;
if (shouldCompacted) {
UNIT_ASSERT(count > 0);
@@ -176,39 +287,30 @@ void CheckShardCompacted(
}
}
-void CheckNoCompactions(
+void CheckNoCompactionsInPeriod(
TTestActorRuntime &runtime,
TTestEnv& env,
- ui64 schemeshardId,
- const TString& path)
+ const TString& path,
+ ui64 schemeshardId = TTestTxConfig::SchemeShard)
{
- auto description = DescribePrivatePath(runtime, schemeshardId, path, true, true);
- TVector<ui64> shards;
- for (auto &part : description.GetPathDescription().GetTablePartitions())
- shards.push_back(part.GetDatashardId());
-
- UNIT_ASSERT(!shards.empty());
+ auto info = GetPathInfo(runtime, path.c_str(), schemeshardId);
+ UNIT_ASSERT(!info.Shards.empty());
env.SimulateSleep(runtime, TDuration::Seconds(30));
- auto [tables, ownerId] = GetTables(runtime, shards.at(0));
-
- auto userTableName = TStringBuf(path).RNextTok('/');
- const auto& userTable = tables[userTableName];
-
- auto count1 = GetCompactionsCount(
+ auto count1 = GetCompactionStats(
runtime,
- userTable,
- shards,
- ownerId);
+ info.UserTable,
+ info.Shards,
+ info.OwnerId).BackgroundRequestCount;
env.SimulateSleep(runtime, TDuration::Seconds(30));
- auto count2 = GetCompactionsCount(
+ auto count2 = GetCompactionStats(
runtime,
- userTable,
- shards,
- ownerId);
+ info.UserTable,
+ info.Shards,
+ info.OwnerId).BackgroundRequestCount;
UNIT_ASSERT_VALUES_EQUAL(count1, count2);
}
@@ -221,29 +323,14 @@ void TestBackgroundCompaction(
{
ui64 txId = 1000;
- TestCreateTable(runtime, ++txId, "/MyRoot",
- R"____(
- Name: "Simple"
- Columns { Name: "key1" Type: "Uint32"}
- Columns { Name: "Value" Type: "Utf8"}
- KeyColumnNames: ["key1"]
- UniformPartitionsCount: 2
- )____");
- env.TestWaitNotification(runtime, txId);
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 2, txId);
+ auto info = GetPathInfo(runtime, "/MyRoot/Simple");
enableBackgroundCompactionFunc(runtime, env);
-
- auto description = DescribePrivatePath(runtime, "/MyRoot/Simple", true, true);
- TVector<ui64> shards;
- for (auto &part : description.GetPathDescription().GetTablePartitions())
- shards.push_back(part.GetDatashardId());
-
env.SimulateSleep(runtime, TDuration::Seconds(30));
- auto [tables, ownerId] = GetTables(runtime, shards.at(0));
-
- for (auto shard: shards)
- CheckShardCompacted(runtime, tables["Simple"], shard, ownerId);
+ for (auto shard: info.Shards)
+ CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId);
}
ui64 TestServerless(
@@ -329,17 +416,13 @@ ui64 TestServerless(
// turn on background compaction
EnableBackgroundCompactionViaRestart(runtime, env, schemeshardId, enableServerless);
- auto description = DescribePrivatePath(runtime, schemeshardId, "/MyRoot/User/Simple", true, true);
- TVector<ui64> shards;
- for (auto &part : description.GetPathDescription().GetTablePartitions())
- shards.push_back(part.GetDatashardId());
+ auto info = GetPathInfo(runtime, "/MyRoot/User/Simple", schemeshardId);
+ UNIT_ASSERT(!info.Shards.empty());
env.SimulateSleep(runtime, TDuration::Seconds(30));
- auto [tables, ownerId] = GetTables(runtime, shards.at(0));
-
- for (auto shard: shards)
- CheckShardCompacted(runtime, tables["Simple"], shard, ownerId, enableServerless);
+ for (auto shard: info.Shards)
+ CheckShardCompacted(runtime, info.UserTable, shard, info.OwnerId, enableServerless);
return schemeshardId;
}
@@ -402,7 +485,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
// some time to finish compactions in progress
env.SimulateSleep(runtime, TDuration::Seconds(30));
- CheckNoCompactions(runtime, env, TTestTxConfig::SchemeShard, "/MyRoot/Simple");
+ CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
}
Y_UNIT_TEST(ShouldNotCompactServerless) {
@@ -444,7 +527,88 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
// some time to finish compactions in progress
env.SimulateSleep(runtime, TDuration::Seconds(30));
- CheckNoCompactions(runtime, env, schemeshardId, "/MyRoot/User/Simple");
+ CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/User/Simple", schemeshardId);
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldNotCompactBackups) {
+ // enabled via schemeshard restart
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ // disable for the case, when compaction is enabled by default
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false);
+
+ ui64 txId = 1000;
+
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 2, txId);
+
+ // backup table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Simple"
+ IsBackup: true
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+
+ CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");
+ UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL);
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldNotCompactBorrowed) {
+ // enabled via schemeshard restart
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+ //runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_TRACE);
+
+ // disable for the case, when compaction is enabled by default
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, false);
+
+ // capture original observer func by setting dummy one
+ auto originalObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>&) {
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+ // now set our observer backed up by original
+ runtime.SetObserverFunc([&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::EvCompactBorrowed:
+ // we should not compact borrowed to check that background compaction
+ // will not compact shard with borrowed parts as well
+ Y_UNUSED(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ default:
+ return originalObserver(runtime, ev);
+ }
+ });
+
+ ui64 txId = 1000;
+
+ // note that we create 1-sharded table to avoid complications
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ // copy table
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "CopyTable"
+ CopyFromTable: "/MyRoot/Simple"
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+
+ CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/CopyTable");
+ UNIT_ASSERT_VALUES_EQUAL(GetCompactionStats(runtime, "/MyRoot/CopyTable").BackgroundRequestCount, 0UL);
+
+ // original table should not be compacted as well
+ CheckNoCompactionsInPeriod(runtime, env, "/MyRoot/Simple");
}
};